From d551a1a3e7f9259a495dba44f315010ef2093402 Mon Sep 17 00:00:00 2001 From: wxyu Date: Tue, 13 Aug 2019 11:24:04 +0800 Subject: [PATCH 1/4] MS-336 scheduler interface Former-commit-id: 407e0af036d417a22e6002906773344fb6559818 --- cpp/src/scheduler/CacheMgr.h | 24 +++ cpp/src/scheduler/Cost.h | 46 +++++ cpp/src/scheduler/ResourceFactory.h | 41 ++++ cpp/src/scheduler/ResourceMgr.h | 141 +++++++++++++ cpp/src/scheduler/Scheduler.h | 155 +++++++++++++++ cpp/src/scheduler/Task.h | 29 +++ cpp/src/scheduler/TaskTable.h | 164 ++++++++++++++++ cpp/src/scheduler/resource/Connection.h | 47 +++++ cpp/src/scheduler/resource/CpuResource.h | 47 +++++ cpp/src/scheduler/resource/DiskResource.h | 21 ++ cpp/src/scheduler/resource/GpuResource.h | 21 ++ cpp/src/scheduler/resource/Node.h | 55 ++++++ cpp/src/scheduler/resource/Resource.h | 229 ++++++++++++++++++++++ cpp/unittest/CMakeLists.txt | 1 + cpp/unittest/scheduler/CMakeLists.txt | 36 ++++ cpp/unittest/scheduler/normal_test.cpp | 39 ++++ 16 files changed, 1096 insertions(+) create mode 100644 cpp/src/scheduler/CacheMgr.h create mode 100644 cpp/src/scheduler/Cost.h create mode 100644 cpp/src/scheduler/ResourceFactory.h create mode 100644 cpp/src/scheduler/ResourceMgr.h create mode 100644 cpp/src/scheduler/Scheduler.h create mode 100644 cpp/src/scheduler/Task.h create mode 100644 cpp/src/scheduler/TaskTable.h create mode 100644 cpp/src/scheduler/resource/Connection.h create mode 100644 cpp/src/scheduler/resource/CpuResource.h create mode 100644 cpp/src/scheduler/resource/DiskResource.h create mode 100644 cpp/src/scheduler/resource/GpuResource.h create mode 100644 cpp/src/scheduler/resource/Node.h create mode 100644 cpp/src/scheduler/resource/Resource.h create mode 100644 cpp/unittest/scheduler/CMakeLists.txt create mode 100644 cpp/unittest/scheduler/normal_test.cpp diff --git a/cpp/src/scheduler/CacheMgr.h b/cpp/src/scheduler/CacheMgr.h new file mode 100644 index 0000000000..321cbbb8a4 --- /dev/null +++ b/cpp/src/scheduler/CacheMgr.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +// dummy cache_mgr +class CacheMgr { + +}; + +using CacheMgrPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h new file mode 100644 index 0000000000..dc52eacaa3 --- /dev/null +++ b/cpp/src/scheduler/Cost.h @@ -0,0 +1,46 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include "Task.h" +#include "CacheMgr.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +// TODO: Policy interface +// TODO: collect statistics + +/* + * select tasks to move; + * call from scheduler; + */ +std::vector +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {} + + +/* + * select task to load + * call from resource; + * I DONT SURE NEED THIS; + */ +std::vector +PickToLoad(TaskTable task_table, uint64_t limit) {} + +/* + * select task to execute; + * call from resource; + * I DONT SURE NEED THIS; + */ +std::vector +PickToExecute(TaskTable task_table, uint64_t limit) {} + + +} +} +} diff --git a/cpp/src/scheduler/ResourceFactory.h b/cpp/src/scheduler/ResourceFactory.h new file mode 100644 index 0000000000..f327522975 --- /dev/null +++ b/cpp/src/scheduler/ResourceFactory.h @@ -0,0 +1,41 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + +#include "resource/Resource.h" +#include "resource/CpuResource.h" +#include "resource/GpuResource.h" +#include "resource/DiskResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceFactory { +public: + static std::shared_ptr + Create(const std::string &name, const std::string &alias = "") { + if (name == "disk") { + return std::make_shared(alias); + } else if (name == "cpu") { + return std::make_shared(alias); + } else if (name == "gpu") { + return std::make_shared(alias); + } else { + return nullptr; + } + } +}; + + +} +} +} + diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h new file mode 100644 index 0000000000..e1cc9110bf --- /dev/null +++ b/cpp/src/scheduler/ResourceMgr.h @@ -0,0 +1,141 @@ + +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceMgr { +public: + ResourceMgr() : running_(false) {} + + /******** Management Interface ********/ + + /* + * Add resource into Resource Management; + * Generate functions on events; + * Functions only modify bool variable, like event trigger; + */ + ResourceWPtr + Add(ResourcePtr &&resource) { + ResourceWPtr ret(resource); + resources_.emplace_back(resource); + +// resource->RegisterOnStartUp([] { +// start_up_event_[index] = true; +// }); +// resource.RegisterOnFinishTask([] { +// finish_task_event_[index] = true; +// }); + return ret; + } + + /* + * Create connection between A and B; + */ + void + Connect(ResourceWPtr &A, ResourceWPtr &B, Connection &connection) { + if (auto observe_a = A.lock()) { + if (auto observe_b = B.lock()) { + observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + } + } + } + + /* + * Synchronous start all resource; + * Last, start event process thread; + */ + void + StartAll() { + for (auto &resource : resources_) { + resource->Start(); + } + worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); + } + + // TODO: add stats interface(low) + +public: + /******** Event Register Interface ********/ + + /* + * Register on start up event; + */ + void + RegisterOnStartUp(std::function &func) { + on_start_up_ = func; + } + + /* + * Register on finish one task event; + */ + void + RegisterOnFinishTask(std::function &func) { + on_finish_task_ = func; + } + + /* + * Register on copy task data completed event; + */ + void + RegisterOnCopyCompleted(std::function &func); + + /* + * Register on task table updated event; + */ + void + RegisterOnTaskTableUpdated(std::function &func); + +public: + /******** Utlitity Functions ********/ + + std::string + Dump(); + +private: + void + EventProcess() { + while (running_) { + for (uint64_t i = 0; i < resources_.size(); ++i) { + if (start_up_event_[i]) { + on_start_up_(resources_[i]); + } + } + } + + } + +private: + bool running_; + + std::vector resources_; + std::thread worker_thread_; + + std::vector start_up_event_; + std::vector finish_task_event_; + std::vector copy_completed_event_; + std::vector task_table_updated_event_; + + std::function on_start_up_; + std::function on_finish_task_; + std::function on_copy_completed_; + std::function on_task_table_updated_; +}; + +using ResourceMgrWPtr = std::weak_ptr; + +} +} +} + diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h new file mode 100644 index 0000000000..4bb840a68d --- /dev/null +++ b/cpp/src/scheduler/Scheduler.h @@ -0,0 +1,155 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Event { +public: + explicit + Event(ResourceWPtr &resource) + : resource_(resource) {} + +public: + virtual void + Process() = 0; + +private: + ResourceWPtr resource_; +}; + +using EventPtr = std::shared_ptr; + +class StartUpEvent : public Event { +public: + explicit + StartUpEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override; +}; + +class FinishTaskEvent : public Event { +public: + explicit + FinishTaskEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override { +// for (nei : res->neighbours) { +// tasks = cost(nei->task_table(), nei->connection, limit = 3) +// res->task_table()->PutTasks(tasks); +// } +// res->WakeUpExec(); + } +}; + +class CopyCompletedEvent : public Event { +public: + explicit + CopyCompletedEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override; +}; + +class TaskTableUpdatedEvent : public Event { +public: + explicit + TaskTableUpdatedEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override; +}; + +class Scheduler { +public: + explicit + Scheduler(ResourceMgrWPtr res_mgr) + : running_(false), + res_mgr_(std::move(res_mgr)) { +// res_mgr.Register(); +// res_mgr.Register(); +// res_mgr.Register(); +// res_mgr.Register(); + } + + void + Start() {} + + /******** Events ********/ + + /* + * Process start up events; + */ + void + OnStartUp(ResourceWPtr &resource) { + // call from res_mgr, non-blocking, if queue size over limit, exception! + auto event = std::make_shared(resource); + event_queue_.push(event); + } + + /* + * Process finish task events; + */ + void + OnFinishTask(ResourceWPtr); + + /* + * Process copy completed events; + */ + void + OnCopyCompleted(ResourceWPtr); + + /* + * Process task table updated events; + */ + void + OnTaskTableUpdated(ResourceWPtr); + + +public: + std::string + Dump(); + + +private: + void + worker_function() { + while (running_) { + auto event = event_queue_.front(); + event->Process(); + } + } + +private: + bool running_; + + ResourceMgrWPtr res_mgr_; + std::queue event_queue_; + std::thread worker_thread_; +}; + +} +} +} + diff --git a/cpp/src/scheduler/Task.h b/cpp/src/scheduler/Task.h new file mode 100644 index 0000000000..e7f33049e5 --- /dev/null +++ b/cpp/src/scheduler/Task.h @@ -0,0 +1,29 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +// dummy task +class Task { +public: + Task(const std::string &name) {} + + void + Execute() {} +}; + +using TaskPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h new file mode 100644 index 0000000000..567aa6214d --- /dev/null +++ b/cpp/src/scheduler/TaskTable.h @@ -0,0 +1,164 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include + +#include "Task.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class TaskTableItemState { + INVALID, + START, // idle + LOADING, // loading data from other resource + LOADED, // ready to exec or move + EXECUTING, // executing, locking util executed or failed + EXECUTED, // executed, termination state + MOVING, // moving to another resource, locking util executed or failed + MOVED, // moved, termination state +}; + +struct TaskTableItem { + TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} + + TaskTableItem(const TaskTableItem &src) + : id(src.id), state(src.state), mutex(), priority(src.priority) {} + + uint64_t id; // auto increment from 0; + // TODO: add tag into task + TaskPtr task; // the task; + TaskTableItemState state; // the state; + std::mutex mutex; + + uint8_t priority; // just a number, meaningless; +}; + +class TaskTable { +public: + TaskTable() = default; + + explicit + TaskTable(std::vector &&tasks) {} + + /* + * Put one task; + */ + void + Put(TaskPtr task) {} + + /* + * Put tasks back of task table; + * Called by DBImpl; + */ + void + Put(std::vector &tasks) {} + + /* + * Return task table item reference; + */ + TaskTableItem & + Get(uint64_t index) {} + + /* + * TODO + * Remove sequence task which is DONE or MOVED from front; + * Called by ? + */ + void + Clear() { + // find first task is NOT (done or moved), erase from begin to it; +// auto iterator = table_.begin(); +// while (iterator->state == TaskTableItemState::EXECUTED or +// iterator->state == TaskTableItemState::MOVED) +// iterator++; +// table_.erase(table_.begin(), iterator); + } + + +public: + + /******** Action ********/ + /* + * Move a task; + * Set state moving; + * Called by scheduler; + */ + + // TODO: bool to Status + bool + Move(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task.mutex); + if (task.state == TaskTableItemState::START) { + task.state = TaskTableItemState::LOADING; + return true; + } + return false; + } + + /* + * Move task finished; + * Set state moved; + * Called by scheduler; + */ + bool + Moved(uint64_t index) {} + + /* + * Load a task; + * Set state loading; + * Called by loader; + */ + bool + Load(uint64_t index) {} + + /* + * Load task finished; + * Set state loaded; + * Called by loader; + */ + bool + Loaded(uint64_t index) {} + + /* + * Execute a task; + * Set state executing; + * Called by executor; + */ + bool + Execute(uint64_t index) {} + + /* + * Execute task finished; + * Set state executed; + * Called by executor; + */ + bool + Executed(uint64_t index) {} + +public: + /* + * Dump; + */ + std::string + Dump(); + +private: + // TODO: map better ? + std::deque table_; +}; + + +} +} +} diff --git a/cpp/src/scheduler/resource/Connection.h b/cpp/src/scheduler/resource/Connection.h new file mode 100644 index 0000000000..0f1088e7fe --- /dev/null +++ b/cpp/src/scheduler/resource/Connection.h @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Connection { +public: + Connection(std::string name, double speed) + : name_(std::move(name)), speed_(speed) {} + + const std::string & + get_name() const { + return name_; + } + + const double + get_speed() const { + return speed_; + } + +public: + std::string + Dump() const { + std::stringstream ss; + ss << ""; + return ss.str(); + } + +private: + std::string name_; + double speed_; +}; + + +} +} +} diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h new file mode 100644 index 0000000000..995615d1ab --- /dev/null +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class CpuResource : public Resource { +public: + explicit + CpuResource(std::string name) + : Resource(std::move(name), ResourceType::CPU) {} + +protected: + void + LoadFile(TaskPtr task) override { +// if (src.type == DISK) { +// fd = open(filename); +// content = fd.read(); +// close(fd); +// } else if (src.type == CPU) { +// memcpy(src, dest, len); +// } else if (src.type == GPU) { +// cudaMemcpyD2H(src, dest); +// } else { +// // unknown type, exception +// } + } + + void + Process(TaskPtr task) override { + task->Execute(); + } +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h new file mode 100644 index 0000000000..b4ff32e75e --- /dev/null +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -0,0 +1,21 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + + +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class DiskResource : public Resource { +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h new file mode 100644 index 0000000000..be91950b78 --- /dev/null +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -0,0 +1,21 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + + +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class GpuResource : public Resource { +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/Node.h b/cpp/src/scheduler/resource/Node.h new file mode 100644 index 0000000000..a32c14ec51 --- /dev/null +++ b/cpp/src/scheduler/resource/Node.h @@ -0,0 +1,55 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + +#include "../TaskTable.h" +#include "Connection.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Node; + +using NeighbourNodePtr = std::weak_ptr; + +struct Neighbour { + NeighbourNodePtr neighbour_node; + Connection connection; +}; + +class Node { +public: + void + AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { + Neighbour neighbour{.neighbour_node = neighbour_node, .connection = connection}; + neighbours_.push_back(neighbour); + } + + void + DelNeighbour(NeighbourNodePtr &neighbour_ptr); + + bool + IsNeighbour(NeighbourNodePtr &neighbour_ptr); + + std::vector + GetNeighbours(); + +public: + std::string + Dump(); + +private: + std::vector neighbours_; +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h new file mode 100644 index 0000000000..5b7bb13a80 --- /dev/null +++ b/cpp/src/scheduler/resource/Resource.h @@ -0,0 +1,229 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "../TaskTable.h" +#include "../Task.h" +#include "../Cost.h" +#include "Node.h" +#include "Connection.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class ResourceType { + DISK = 0, + CPU = 1, + GPU = 2 +}; + +class Resource : public Node { +public: + void + Start() { + loader_thread_ = std::thread(&Resource::loader_function, this); + executor_thread_ = std::thread(&Resource::executor_function, this); + } + + void + Stop() { + running_ = false; + WakeupLoader(); + WakeupExecutor(); + } + + TaskTable & + task_table() { + return task_table_; + } + +public: + /* + * wake up executor; + */ + void + WakeupExecutor() { + exec_cv_.notify_one(); + } + + /* + * wake up loader; + */ + void + WakeupLoader() { + load_cv_.notify_one(); + } + +public: + /* + * Event function MUST be a short function, never blocking; + */ + + /* + * Register on start up event; + */ + void + RegisterOnStartUp(std::function func); + + /* + * Register on finish one task event; + */ + void + RegisterOnFinishTask(std::function func); + + /* + * Register on copy task data completed event; + */ + void + RegisterOnCopyCompleted(std::function func); + + /* + * Register on task table updated event; + */ + void + RegisterOnTaskTableUpdated(std::function func); + +protected: + Resource(std::string name, ResourceType type) + : name_(std::move(name)), + type_(type), + on_start_up_(nullptr), + on_finish_task_(nullptr), + on_copy_completed_(nullptr), + on_task_table_updated_(nullptr), + running_(false), + load_flag_(false), + exec_flag_(false) { + } + + // TODO: SearchContextPtr to TaskPtr + /* + * Implementation by inherit class; + * Blocking function; + */ + virtual void + LoadFile(TaskPtr task) = 0; + + /* + * Implementation by inherit class; + * Blocking function; + */ + virtual void + Process(TaskPtr task) = 0; + +private: + /* + * These function should move to cost.h ??? + * COST.H ??? + */ + + /* + * Pick one task to load; + * Order by start time; + */ + TaskPtr + pick_task_load() { + auto tasks = PickToLoad(task_table_, 3); + for (uint64_t i = 0; i < tasks.size(); ++i) { + // try to set one task loading, then return + if (task_table_.Load(i)) + return task_table_.Get(i).task; + // else try next + } + return nullptr; + } + + /* + * Pick one task to execute; + * Pick by start time and priority; + */ + TaskPtr + pick_task_execute() { + auto tasks = PickToExecute(task_table_, 3); + for (uint64_t i = 0; i < tasks.size(); ++i) { + // try to set one task executing, then return + if (task_table_.Execute(i)) + return task_table_.Get(i).task; + // else try next + } + return nullptr; + } + +private: + /* + * Only called by load thread; + */ + void + loader_function() { + while (running_) { + std::unique_lock lock(load_mutex_); + load_cv_.wait(lock, [&] { return load_flag_; }); + auto task = pick_task_load(); + if (task) { + LoadFile(task); + on_copy_completed_(); + } + } + + } + + /* + * Only called by worker thread; + */ + void + executor_function() { + on_start_up_(); + while (running_) { + std::unique_lock lock(exec_mutex_); + exec_cv_.wait(lock, [&] { return exec_flag_; }); + auto task = pick_task_execute(); + if (task) { + Process(task); + on_finish_task_(); + } + } + } + + +private: + std::string name_; + ResourceType type_; + + TaskTable task_table_; + + std::function on_start_up_; + std::function on_finish_task_; + std::function on_copy_completed_; + std::function on_task_table_updated_; + + bool running_; + std::thread loader_thread_; + std::thread executor_thread_; + + bool load_flag_; + bool exec_flag_; + std::mutex load_mutex_; + std::mutex exec_mutex_; + std::condition_variable load_cv_; + std::condition_variable exec_cv_; +}; + +using ResourcePtr = std::shared_ptr; +using ResourceWPtr = std::weak_ptr; + +} +} +} + diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index aa96fa6e25..287fe51128 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -42,4 +42,5 @@ add_subdirectory(server) add_subdirectory(db) add_subdirectory(knowhere) add_subdirectory(metrics) +add_subdirectory(scheduler) #add_subdirectory(storage) \ No newline at end of file diff --git a/cpp/unittest/scheduler/CMakeLists.txt b/cpp/unittest/scheduler/CMakeLists.txt new file mode 100644 index 0000000000..d62d80f969 --- /dev/null +++ b/cpp/unittest/scheduler/CMakeLists.txt @@ -0,0 +1,36 @@ +#------------------------------------------------------------------------------- +# Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +# Unauthorized copying of this file, via any medium is strictly prohibited. +# Proprietary and confidential. +#------------------------------------------------------------------------------- +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) +aux_source_directory(./ test_srcs) + +include_directories(/usr/local/cuda/include) +link_directories("/usr/local/cuda/lib64") + +include_directories(/usr/include/mysql) + +#add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY) + +set(scheduler_test_src + ${scheduler_resource_srcs} + ${scheduler_srcs} + ${test_srcs} + ) + +cuda_add_executable(scheduler_test ${scheduler_test_src}) + +set(scheduler_libs + sqlite + boost_system_static + boost_filesystem_static + lz4 + mysqlpp + ) + +target_link_libraries(scheduler_test ${scheduler_libs} ${unittest_libs}) + +install(TARGETS scheduler_test DESTINATION bin) + diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp new file mode 100644 index 0000000000..7318efc9df --- /dev/null +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -0,0 +1,39 @@ +#include "scheduler/ResourceFactory.h" +#include "scheduler/ResourceMgr.h" +#include "scheduler/Scheduler.h" +#include + + +using namespace zilliz::milvus::engine; + +int main() { + + // ResourceMgr only compose resources, provide unified event + auto res_mgr = std::make_shared(); + auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd")); + auto cpu = res_mgr->Add(ResourceFactory::Create("cpu")); + auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu")); + auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu")); + + auto IO = Connection("IO", 500.0); + auto PCIE = Connection("IO", 11000.0); + res_mgr->Connect(disk, cpu, IO); + res_mgr->Connect(cpu, gpu1, PCIE); + res_mgr->Connect(cpu, gpu2, PCIE); + + res_mgr->StartAll(); + + auto task1 = std::make_shared("123456789"); + auto task2 = std::make_shared("222222222"); + if (auto observe = disk.lock()) { + observe->task_table().Put(task1); + observe->task_table().Put(task2); + observe->task_table().Put(task1); + observe->task_table().Put(task1); + } + + auto scheduler = new Scheduler(res_mgr); + scheduler->Start(); + + while (true) sleep(1); +} From 6efc7398cc43baf62f0fcb14b15526b4761eb8eb Mon Sep 17 00:00:00 2001 From: wxyu Date: Tue, 13 Aug 2019 11:26:21 +0800 Subject: [PATCH 2/4] MS-336 scheduler interface Former-commit-id: 3c66c2027cfad489d7bf7cde2a6a577cd92dc8a3 --- CHANGELOGS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOGS.md b/CHANGELOGS.md index c8ee3b39fd..a0c7ed62c5 100644 --- a/CHANGELOGS.md +++ b/CHANGELOGS.md @@ -19,3 +19,4 @@ Please mark all change in change log and use the ticket from JIRA. - MS-202 - Add Milvus Jenkins project email notification - MS-215 - Add Milvus cluster CI/CD groovy file - MS-277 - Update CUDA Version to V10.1 +- MS-336 - Scheduler interface From 549b18f871f9934932fd3c0d2eaf1d473bfde0d8 Mon Sep 17 00:00:00 2001 From: starlord Date: Tue, 13 Aug 2019 15:22:01 +0800 Subject: [PATCH 3/4] MS-343 implement ResourceMgr Former-commit-id: 4cafc2cadf229ab61952233f1abc04d4ffdf9129 --- cpp/CHANGELOG.md | 1 + cpp/src/CMakeLists.txt | 2 + cpp/src/scheduler/ResourceMgr.cpp | 105 ++++++++++++++++++++++++++++++ cpp/src/scheduler/ResourceMgr.h | 58 ++++++----------- 4 files changed, 128 insertions(+), 38 deletions(-) create mode 100644 cpp/src/scheduler/ResourceMgr.cpp diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 26b0226949..189757cc14 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,6 +10,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-327 - Clean code for milvus ## New Feature +- MS-343 - Implement ResourceMgr ## Task - MS-297 - disable mysql unit test diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 63edcd3993..ba4e334480 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -16,6 +16,7 @@ aux_source_directory(db/insert db_insert_files) aux_source_directory(db/meta db_meta_files) aux_source_directory(metrics metrics_files) aux_source_directory(wrapper/knowhere knowhere_files) +aux_source_directory(scheduler new_scheduler_files) aux_source_directory(db/scheduler scheduler_files) aux_source_directory(db/scheduler/context scheduler_context_files) @@ -62,6 +63,7 @@ set(db_files ${db_insert_files} ${db_meta_files} ${db_scheduler_files} + ${new_scheduler_files} ${metrics_files} ${knowhere_files} ) diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp new file mode 100644 index 0000000000..faf422b849 --- /dev/null +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -0,0 +1,105 @@ + +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "ResourceMgr.h" +#include "db/Log.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +ResourceMgr::ResourceMgr() + : running_(false) { + +} + +ResourceWPtr +ResourceMgr::Add(ResourcePtr &&resource) { + ResourceWPtr ret(resource); + + std::lock_guard lck(resources_mutex_); + if(running_) { + ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource"; + return ret; + } + + resources_.emplace_back(resource); + + size_t index = resources_.size() - 1; + resource->RegisterOnStartUp([&] { + start_up_event_[index] = true; + event_cv_.notify_one(); + }); + resource->RegisterOnFinishTask([&] { + finish_task_event_[index] = true; + event_cv_.notify_one(); + }); + return ret; +} + +void +ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) { + if (auto observe_a = res1.lock()) { + if (auto observe_b = res2.lock()) { + observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + } + } +} + +void +ResourceMgr::EventProcess() { + while (running_) { + std::unique_lock lock(resources_mutex_); + event_cv_.wait(lock, [this] { return !resources_.empty(); }); + + if(!running_) { + break; + } + + for (uint64_t i = 0; i < resources_.size(); ++i) { + ResourceWPtr res(resources_[i]); + if (start_up_event_[i]) { + on_start_up_(res); + } + if (finish_task_event_[i]) { + on_finish_task_(res); + } + if (copy_completed_event_[i]) { + on_copy_completed_(res); + } + if (task_table_updated_event_[i]) { + on_task_table_updated_(res); + } + } + } +} + +void +ResourceMgr::Start() { + std::lock_guard lck(resources_mutex_); + for (auto &resource : resources_) { + resource->Start(); + } + worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); + + running_ = true; +} + +void +ResourceMgr::Stop() { + std::lock_guard lck(resources_mutex_); + + running_ = false; + worker_thread_.join(); + + for (auto &resource : resources_) { + resource->Stop(); + } +} + +} +} +} diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index e1cc9110bf..e7a7650695 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -9,7 +9,10 @@ #include #include #include +#include +#include +#include "resource/Resource.h" namespace zilliz { namespace milvus { @@ -17,7 +20,7 @@ namespace engine { class ResourceMgr { public: - ResourceMgr() : running_(false) {} + ResourceMgr(); /******** Management Interface ********/ @@ -27,42 +30,24 @@ public: * Functions only modify bool variable, like event trigger; */ ResourceWPtr - Add(ResourcePtr &&resource) { - ResourceWPtr ret(resource); - resources_.emplace_back(resource); - -// resource->RegisterOnStartUp([] { -// start_up_event_[index] = true; -// }); -// resource.RegisterOnFinishTask([] { -// finish_task_event_[index] = true; -// }); - return ret; - } + Add(ResourcePtr &&resource); /* * Create connection between A and B; */ void - Connect(ResourceWPtr &A, ResourceWPtr &B, Connection &connection) { - if (auto observe_a = A.lock()) { - if (auto observe_b = B.lock()) { - observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); - } - } - } + Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection); /* * Synchronous start all resource; * Last, start event process thread; */ void - StartAll() { - for (auto &resource : resources_) { - resource->Start(); - } - worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); - } + Start(); + + void + Stop(); + // TODO: add stats interface(low) @@ -89,13 +74,17 @@ public: * Register on copy task data completed event; */ void - RegisterOnCopyCompleted(std::function &func); + RegisterOnCopyCompleted(std::function &func) { + on_copy_completed_ = func; + } /* * Register on task table updated event; */ void - RegisterOnTaskTableUpdated(std::function &func); + RegisterOnTaskTableUpdated(std::function &func) { + on_task_table_updated_ = func; + } public: /******** Utlitity Functions ********/ @@ -105,23 +94,16 @@ public: private: void - EventProcess() { - while (running_) { - for (uint64_t i = 0; i < resources_.size(); ++i) { - if (start_up_event_[i]) { - on_start_up_(resources_[i]); - } - } - } - - } + EventProcess(); private: bool running_; std::vector resources_; + mutable std::mutex resources_mutex_; std::thread worker_thread_; + std::condition_variable event_cv_; std::vector start_up_event_; std::vector finish_task_event_; std::vector copy_completed_event_; From 3e47b26d9f20a73970096ca1ffe413fd47c5a3b9 Mon Sep 17 00:00:00 2001 From: starlord Date: Tue, 13 Aug 2019 15:28:27 +0800 Subject: [PATCH 4/4] MS-343 implement ResourceMgr Former-commit-id: f0f04297bb219cfa20e4eba609ff390f84e3a4a3 --- cpp/src/scheduler/ResourceMgr.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index faf422b849..d033bd9298 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -63,15 +63,19 @@ ResourceMgr::EventProcess() { ResourceWPtr res(resources_[i]); if (start_up_event_[i]) { on_start_up_(res); + start_up_event_[i] = false; } if (finish_task_event_[i]) { on_finish_task_(res); + finish_task_event_[i] = false; } if (copy_completed_event_[i]) { on_copy_completed_(res); + copy_completed_event_[i] = false; } if (task_table_updated_event_[i]) { on_task_table_updated_(res); + task_table_updated_event_[i] = false; } } } @@ -100,6 +104,18 @@ ResourceMgr::Stop() { } } +std::string +ResourceMgr::Dump() { + std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; + + for (uint64_t i = 0; i < resources_.size(); ++i) { + str += "Resource No." + std::to_string(i) + ":\n"; + str += resources_[i]->Dump(); + } + + return str; +} + } } }