diff --git a/ci/jenkinsfile/publish_docker.groovy b/ci/jenkinsfile/publish_docker.groovy index d43b06c9d3..cb8e686310 100644 --- a/ci/jenkinsfile/publish_docker.groovy +++ b/ci/jenkinsfile/publish_docker.groovy @@ -8,7 +8,7 @@ container('publish-docker') { sh "curl -O -u anonymous: ftp://192.168.1.126/data/${PROJECT_NAME}/engine/${JOB_NAME}-${BUILD_ID}/${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz" sh "tar zxvf ${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz" try { - docker.withRegistry('https://registry.zilliz.com', 'a54e38ef-c424-4ea9-9224-b25fc20e3924') { + docker.withRegistry('https://registry.zilliz.com', '${params.DOCKER_PUBLISH_USER}') { def customImage = docker.build("${PROJECT_NAME}/engine:${DOCKER_VERSION}") customImage.push() } diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 6f1c49e9f4..e0f3b1de25 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -16,6 +16,22 @@ Please mark all change in change log and use the ticket from JIRA. - MS-350 - Remove knowhere submodule - MS-354 - Add task class and interface in scheduler - MS-355 - Add copy interface in ExcutionEngine +- MS-357 - Add minimum schedule function +- MS-359 - Add cost test in new scheduler +- MS-361 - Add event in resource +- MS-364 - Modify tasktableitem in tasktable +- MS-365 - Use tasktableitemptr instead in event +- MS-366 - Implement TaskTable +- MS-368 - Implement cost.cpp +- MS-371 - Add TaskTableUpdatedEvent +- MS-373 - Add resource test +- MS-374 - Add action definition +- MS-375 - Add Dump implementation for Event +- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task +- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable +- MS-378 - Debug and Update normal_test in scheduler unittest +- MS-379 - Add Dump implementation in Resource +- MS-380 - Update resource loader and executor, work util all finished ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/db/meta/SqliteMetaImpl.cpp b/cpp/src/db/meta/SqliteMetaImpl.cpp index 9411a5e82b..9053139e0b 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.cpp +++ b/cpp/src/db/meta/SqliteMetaImpl.cpp @@ -606,9 +606,9 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, &TableFileSchema::engine_type_); auto match_tableid = c(&TableFileSchema::table_id_) == table_id; - auto is_raw = c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW; - auto is_toindex = c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX; - auto is_index = c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX; + + std::vector file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX}; + auto match_type = in(&TableFileSchema::file_type_, file_type); TableSchema table_schema; table_schema.table_id_ = table_id; @@ -617,23 +617,23 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, decltype(ConnectorPtr->select(select_columns)) result; if (partition.empty() && ids.empty()) { - auto filter = where(match_tableid and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_type); result = ConnectorPtr->select(select_columns, filter); } else if (partition.empty() && !ids.empty()) { auto match_fileid = in(&TableFileSchema::id_, ids); - auto filter = where(match_tableid and match_fileid and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_fileid and match_type); result = ConnectorPtr->select(select_columns, filter); } else if (!partition.empty() && ids.empty()) { auto match_date = in(&TableFileSchema::date_, partition); - auto filter = where(match_tableid and match_date and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_date and match_type); result = ConnectorPtr->select(select_columns, filter); } else if (!partition.empty() && !ids.empty()) { auto match_fileid = in(&TableFileSchema::id_, ids); auto match_date = in(&TableFileSchema::date_, partition); - auto filter = where(match_tableid and match_fileid and match_date and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_fileid and match_date and match_type); result = ConnectorPtr->select(select_columns, filter); } diff --git a/cpp/src/scheduler/Cost.cpp b/cpp/src/scheduler/Cost.cpp index 0f7c30c6a7..724a717d2f 100644 --- a/cpp/src/scheduler/Cost.cpp +++ b/cpp/src/scheduler/Cost.cpp @@ -12,22 +12,40 @@ namespace milvus { namespace engine { std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) { +PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) { std::vector indexes; + for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { + if (task_table[i]->state == TaskTableItemState::LOADED) { + indexes.push_back(i); + ++count; + } + } return indexes; } std::vector -PickToLoad(const TaskTable &task_table, uint64_t limit) { +PickToLoad(TaskTable &task_table, uint64_t limit) { std::vector indexes; + for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { + if (task_table[i]->state == TaskTableItemState::START) { + indexes.push_back(i); + ++count; + } + } return indexes; } std::vector -PickToExecute(const TaskTable &task_table, uint64_t limit) { +PickToExecute(TaskTable &task_table, uint64_t limit) { std::vector indexes; + for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { + if (task_table[i]->state == TaskTableItemState::LOADED) { + indexes.push_back(i); + ++count; + } + } return indexes; } diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h index 98de9d8fc1..76f16d4d1d 100644 --- a/cpp/src/scheduler/Cost.h +++ b/cpp/src/scheduler/Cost.h @@ -23,7 +23,7 @@ namespace engine { * call from scheduler; */ std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit); +PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit); /* @@ -32,7 +32,7 @@ PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) * I DONT SURE NEED THIS; */ std::vector -PickToLoad(const TaskTable &task_table, uint64_t limit); +PickToLoad(TaskTable &task_table, uint64_t limit); /* * select task to execute; @@ -40,7 +40,7 @@ PickToLoad(const TaskTable &task_table, uint64_t limit); * I DONT SURE NEED THIS; */ std::vector -PickToExecute(const TaskTable &task_table, uint64_t limit); +PickToExecute(TaskTable &task_table, uint64_t limit); } diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index b24e08a413..916aaa238a 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -7,6 +7,7 @@ #include "ResourceMgr.h" #include "db/Log.h" + namespace zilliz { namespace milvus { namespace engine { @@ -21,30 +22,18 @@ ResourceMgr::Add(ResourcePtr &&resource) { ResourceWPtr ret(resource); std::lock_guard lck(resources_mutex_); - if(running_) { + if (running_) { ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource"; return ret; } + if (resource->Type() == ResourceType::DISK) { + disk_resources_.emplace_back(ResourceWPtr(resource)); + } resources_.emplace_back(resource); size_t index = resources_.size() - 1; - resource->RegisterOnStartUp([&] { - start_up_event_[index] = true; - event_cv_.notify_one(); - }); - resource->RegisterOnFinishTask([&] { - finish_task_event_[index] = true; - event_cv_.notify_one(); - }); - resource->RegisterOnCopyCompleted([&] { - copy_completed_event_[index] = true; - event_cv_.notify_one(); - }); - resource->RegisterOnTaskTableUpdated([&] { - task_table_updated_event_[index] = true; - event_cv_.notify_one(); - }); + resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1)); return ret; } @@ -53,41 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect if (auto observe_a = res1.lock()) { if (auto observe_b = res2.lock()) { observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + observe_b->AddNeighbour(std::static_pointer_cast(observe_a), connection); } } } -void -ResourceMgr::EventProcess() { - while (running_) { - std::unique_lock lock(resources_mutex_); - event_cv_.wait(lock, [this] { return !resources_.empty(); }); - - if(!running_) { - break; - } - - for (uint64_t i = 0; i < resources_.size(); ++i) { - ResourceWPtr res(resources_[i]); - if (start_up_event_[i]) { - on_start_up_(res); - start_up_event_[i] = false; - } - if (finish_task_event_[i]) { - on_finish_task_(res); - finish_task_event_[i] = false; - } - if (copy_completed_event_[i]) { - on_copy_completed_(res); - copy_completed_event_[i] = false; - } - if (task_table_updated_event_[i]) { - on_task_table_updated_(res); - task_table_updated_event_[i] = false; - } - } - } -} void ResourceMgr::Start() { @@ -95,23 +54,33 @@ ResourceMgr::Start() { for (auto &resource : resources_) { resource->Start(); } - worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); - running_ = true; + worker_thread_ = std::thread(&ResourceMgr::event_process, this); } void ResourceMgr::Stop() { - std::lock_guard lck(resources_mutex_); - - running_ = false; + { + std::lock_guard lock(event_mutex_); + running_ = false; + queue_.push(nullptr); + event_cv_.notify_one(); + } worker_thread_.join(); + std::lock_guard lck(resources_mutex_); for (auto &resource : resources_) { resource->Stop(); } } +void +ResourceMgr::PostEvent(const EventPtr &event) { + std::unique_lock lock(event_mutex_); + queue_.emplace(event); + event_cv_.notify_one(); +} + std::string ResourceMgr::Dump() { std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; @@ -124,6 +93,26 @@ ResourceMgr::Dump() { return str; } +void +ResourceMgr::event_process() { + while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !queue_.empty(); }); + + auto event = queue_.front(); + if (event == nullptr) { + break; + } + +// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; + + queue_.pop(); + if (subscriber_) { + subscriber_(event); + } + } +} + } } } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index e7a7650695..cb2e631935 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -10,9 +10,12 @@ #include #include #include +#include #include #include "resource/Resource.h" +#include "utils/Log.h" + namespace zilliz { namespace milvus { @@ -23,6 +26,15 @@ public: ResourceMgr(); /******** Management Interface ********/ + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + + std::vector & + GetDiskResources() { + return disk_resources_; + } /* * Add resource into Resource Management; @@ -48,44 +60,11 @@ public: void Stop(); + void + PostEvent(const EventPtr& event); // TODO: add stats interface(low) -public: - /******** Event Register Interface ********/ - - /* - * Register on start up event; - */ - void - RegisterOnStartUp(std::function &func) { - on_start_up_ = func; - } - - /* - * Register on finish one task event; - */ - void - RegisterOnFinishTask(std::function &func) { - on_finish_task_ = func; - } - - /* - * Register on copy task data completed event; - */ - void - RegisterOnCopyCompleted(std::function &func) { - on_copy_completed_ = func; - } - - /* - * Register on task table updated event; - */ - void - RegisterOnTaskTableUpdated(std::function &func) { - on_task_table_updated_ = func; - } - public: /******** Utlitity Functions ********/ @@ -94,27 +73,25 @@ public: private: void - EventProcess(); + event_process(); private: + std::queue queue_; + std::function subscriber_ = nullptr; + bool running_; + std::vector disk_resources_; std::vector resources_; mutable std::mutex resources_mutex_; std::thread worker_thread_; + std::mutex event_mutex_; std::condition_variable event_cv_; - std::vector start_up_event_; - std::vector finish_task_event_; - std::vector copy_completed_event_; - std::vector task_table_updated_event_; - std::function on_start_up_; - std::function on_finish_task_; - std::function on_copy_completed_; - std::function on_task_table_updated_; }; +using ResourceMgrPtr = std::shared_ptr; using ResourceMgrWPtr = std::weak_ptr; } diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index c5ae928166..191d1957aa 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -5,40 +5,46 @@ ******************************************************************************/ #include "Scheduler.h" +#include "Cost.h" +#include "action/Action.h" namespace zilliz { namespace milvus { namespace engine { -void -StartUpEvent::Process() { - -} - -void -FinishTaskEvent::Process() { -// for (nei : res->neighbours) { -// tasks = cost(nei->task_table(), nei->connection, limit = 3) -// res->task_table()->PutTasks(tasks); -// } -// res->WakeUpExec(); -} - -void -CopyCompletedEvent::Process() { - -} - -void -TaskTableUpdatedEvent::Process() { - +Scheduler::Scheduler(ResourceMgrWPtr res_mgr) + : running_(false), + res_mgr_(std::move(res_mgr)) { + if (auto mgr = res_mgr_.lock()) { + mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); + } } void Scheduler::Start() { - worker_thread_ = std::thread(&Scheduler::worker_thread_, this); + running_ = true; + worker_thread_ = std::thread(&Scheduler::worker_function, this); +} + +void +Scheduler::Stop() { + { + std::lock_guard lock(event_mutex_); + running_ = false; + event_queue_.push(nullptr); + event_cv_.notify_one(); + } + worker_thread_.join(); +} + +void +Scheduler::PostEvent(const EventPtr &event) { + std::lock_guard lock(event_mutex_); + event_queue_.push(event); + event_cv_.notify_one(); +// SERVER_LOG_DEBUG << "Scheduler post " << *event; } std::string @@ -49,8 +55,76 @@ Scheduler::Dump() { void Scheduler::worker_function() { while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); auto event = event_queue_.front(); - event->Process(); + if (event == nullptr) { + break; + } + +// SERVER_LOG_DEBUG << "Scheduler process " << *event; + event_queue_.pop(); + Process(event); + } +} + +void +Scheduler::Process(const EventPtr &event) { + switch (event->Type()) { + case EventType::START_UP: { + OnStartUp(event); + break; + } + case EventType::COPY_COMPLETED: { + OnCopyCompleted(event); + break; + } + case EventType::FINISH_TASK: { + OnFinishTask(event); + break; + } + case EventType::TASK_TABLE_UPDATED: { + OnTaskTableUpdated(event); + break; + } + default: { + // TODO: logging + break; + } + } +} + + +void +Scheduler::OnStartUp(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } +} + +void +Scheduler::OnFinishTask(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupExecutor(); + } +} + +void +Scheduler::OnCopyCompleted(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + resource->WakeupExecutor(); + if (resource->Type()== ResourceType::DISK) { + Action::PushTaskToNeighbour(event->resource_); + } + } +} + +void +Scheduler::OnTaskTableUpdated(const EventPtr &event) { +// Action::PushTaskToNeighbour(event->resource_); + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); } } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 152937a1d2..012a479a82 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -5,6 +5,7 @@ ******************************************************************************/ #pragma once +#include #include #include #include @@ -12,128 +13,93 @@ #include "resource/Resource.h" #include "ResourceMgr.h" +#include "utils/Log.h" namespace zilliz { namespace milvus { namespace engine { -class Event { -public: - explicit - Event(ResourceWPtr &resource) : resource_(resource) {} - -public: - virtual void - Process() = 0; - -private: - ResourceWPtr resource_; -}; - -using EventPtr = std::shared_ptr; - -class StartUpEvent : public Event { -public: - explicit - StartUpEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class FinishTaskEvent : public Event { -public: - explicit - FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class CopyCompletedEvent : public Event { -public: - explicit - CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class TaskTableUpdatedEvent : public Event { -public: - explicit - TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; class Scheduler { public: explicit - Scheduler(ResourceMgrWPtr res_mgr) - : running_(false), - res_mgr_(std::move(res_mgr)) { -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); - } + Scheduler(ResourceMgrWPtr res_mgr); + Scheduler(const Scheduler &) = delete; + Scheduler(Scheduler &&) = delete; + + /* + * Start worker thread; + */ void Start(); -public: + /* + * Stop worker thread, join it; + */ + void + Stop(); + + /* + * Post event to scheduler event queue; + */ + void + PostEvent(const EventPtr &event); + + /* + * Dump as string; + */ + std::string + Dump(); + +private: /******** Events ********/ /* * Process start up events; + * + * Actions: + * Pull task from neighbours; */ - inline void - OnStartUp(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnStartUp(const EventPtr &event); /* * Process finish task events; + * + * Actions: + * Pull task from neighbours; */ - inline void - OnFinishTask(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnFinishTask(const EventPtr &event); /* * Process copy completed events; + * + * Actions: + * Mark task source MOVED; + * Pull task from neighbours; */ - inline void - OnCopyCompleted(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnCopyCompleted(const EventPtr &event); /* - * Process task table updated events; + * Process task table updated events, which happened on task_table->put; + * + * Actions: + * Push task to neighbours; */ - inline void - OnTaskTableUpdated(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } - - -public: - std::string - Dump(); - + void + OnTaskTableUpdated(const EventPtr &event); private: + /* + * Dispatch event to event handler; + */ + void + Process(const EventPtr &event); + /* * Called by worker_thread_; */ @@ -146,8 +112,12 @@ private: ResourceMgrWPtr res_mgr_; std::queue event_queue_; std::thread worker_thread_; + std::mutex event_mutex_; + std::condition_variable event_cv_; }; +using SchedulerPtr = std::shared_ptr; + } } } diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 1e77318e86..7ef033c500 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -5,29 +5,46 @@ ******************************************************************************/ #include "TaskTable.h" +#include "event/TaskTableUpdatedEvent.h" #include +#include namespace zilliz { namespace milvus { namespace engine { -TaskTable::TaskTable(std::vector &&tasks) { - -} void TaskTable::Put(TaskPtr task) { - + std::lock_guard lock(id_mutex_); + auto item = std::make_shared(); + item->id = id_++; + item->task = std::move(task); + item->state = TaskTableItemState::START; + table_.push_back(item); + if (subscriber_) { + subscriber_(); + } } void TaskTable::Put(std::vector &tasks) { - + std::lock_guard lock(id_mutex_); + for (auto &task : tasks) { + auto item = std::make_shared(); + item->id = id_++; + item->task = std::move(task); + item->state = TaskTableItemState::START; + table_.push_back(item); + } + if (subscriber_) { + subscriber_(); + } } -TaskTableItem & +TaskTableItemPtr TaskTable::Get(uint64_t index) { return table_[index]; } @@ -46,9 +63,9 @@ bool TaskTable::Move(uint64_t index) { auto &task = table_[index]; - std::lock_guard lock(task.mutex); - if (task.state == TaskTableItemState::START) { - task.state = TaskTableItemState::LOADING; + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::LOADED) { + task->state = TaskTableItemState::MOVING; return true; } return false; @@ -56,32 +73,88 @@ TaskTable::Move(uint64_t index) { bool TaskTable::Moved(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::MOVING) { + task->state = TaskTableItemState::MOVED; + return true; + } return false; } bool TaskTable::Load(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::START) { + task->state = TaskTableItemState::LOADING; + return true; + } return false; } bool TaskTable::Loaded(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::LOADING) { + task->state = TaskTableItemState::LOADED; + return true; + } return false; } bool TaskTable::Execute(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::LOADED) { + task->state = TaskTableItemState::EXECUTING; + return true; + } return false; } bool TaskTable::Executed(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::EXECUTING) { + task->state = TaskTableItemState::EXECUTED; + return true; + } return false; } +std::string +ToString(TaskTableItemState state) { + switch (state) { + case TaskTableItemState::INVALID: return "INVALID"; + case TaskTableItemState::START: return "START"; + case TaskTableItemState::LOADING: return "LOADING"; + case TaskTableItemState::LOADED: return "LOADED"; + case TaskTableItemState::EXECUTING: return "EXECUTING"; + case TaskTableItemState::EXECUTED: return "EXECUTED"; + case TaskTableItemState::MOVING: return "MOVING"; + case TaskTableItemState::MOVED: return "MOVED"; + default: return ""; + } +} + std::string TaskTable::Dump() { - return std::string(); + std::stringstream ss; + for (auto &item : table_) { + ss << "<" << item->id; + ss << ", " << ToString(item->state); + ss << ">" << std::endl; + } + return ss.str(); } } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index dc286d8f74..8a482d0579 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -10,6 +10,7 @@ #include #include "task/SearchTask.h" +#include "event/Event.h" namespace zilliz { @@ -31,7 +32,7 @@ struct TaskTableItem { TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} TaskTableItem(const TaskTableItem &src) - : id(src.id), state(src.state), mutex(), priority(src.priority) {} + : id(src.id), state(src.state), mutex(), priority(src.priority) {} uint64_t id; // auto increment from 0; // TODO: add tag into task @@ -42,12 +43,19 @@ struct TaskTableItem { uint8_t priority; // just a number, meaningless; }; +using TaskTableItemPtr = std::shared_ptr; + class TaskTable { public: TaskTable() = default; - explicit - TaskTable(std::vector &&tasks); + TaskTable(const TaskTable &) = delete; + TaskTable(TaskTable &&) = delete; + + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } /* * Put one task; @@ -65,7 +73,7 @@ public: /* * Return task table item reference; */ - TaskTableItem & + TaskTableItemPtr Get(uint64_t index); /* @@ -76,6 +84,29 @@ public: void Clear(); + /* + * Return true if task table empty, otherwise false; + */ + inline bool + Empty() { + return table_.empty(); + } + + /* + * Return size of task table; + */ + inline size_t + Size() { + return table_.size(); + } +public: + TaskTableItemPtr & + operator[](uint64_t index) { + return table_[index]; + } + + std::deque::iterator begin() { return table_.begin(); } + std::deque::iterator end() { return table_.end(); } public: @@ -139,7 +170,10 @@ public: private: // TODO: map better ? - std::deque table_; + std::uint64_t id_ = 0; + mutable std::mutex id_mutex_; + std::deque table_; + std::function subscriber_ = nullptr; }; diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h new file mode 100644 index 0000000000..d72bbefc8d --- /dev/null +++ b/cpp/src/scheduler/action/Action.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "../resource/Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Action { +public: + /* + * Push task to neighbour; + */ + static void + PushTaskToNeighbour(const ResourceWPtr &self); + + + /* + * Pull task From neighbour; + */ + static void + PullTaskFromNeighbour(const ResourceWPtr &self); +}; + + +} +} +} diff --git a/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp b/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp new file mode 100644 index 0000000000..b1ac97b6e4 --- /dev/null +++ b/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Action.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +void +Action::PullTaskFromNeighbour(const ResourceWPtr &self) { + // TODO: implement +} + + +} +} +} + + diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp new file mode 100644 index 0000000000..a9b43b3f05 --- /dev/null +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -0,0 +1,44 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Action.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +void +push_task(const ResourcePtr &self, const ResourcePtr &other) { + auto &self_task_table = self->task_table(); + auto &other_task_table = other->task_table(); + CacheMgr cache; + auto indexes = PickToMove(self_task_table, cache, 1); + for (auto index : indexes) { + if (self_task_table.Move(index)) { + auto task = self_task_table.Get(index)->task; + other_task_table.Put(task); + // TODO: mark moved future + } + } +} + +void +Action::PushTaskToNeighbour(const ResourceWPtr &res) { + if (auto self = res.lock()) { + for (auto &neighbour : self->GetNeighbours()) { + if (auto n = neighbour.neighbour_node.lock()) { + push_task(self, std::static_pointer_cast(n)); + } + } + } +} + + +} +} +} + diff --git a/cpp/src/scheduler/event/CopyCompletedEvent.h b/cpp/src/scheduler/event/CopyCompletedEvent.h new file mode 100644 index 0000000000..d2f5ddb0ff --- /dev/null +++ b/cpp/src/scheduler/event/CopyCompletedEvent.h @@ -0,0 +1,35 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" +#include "../TaskTable.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class CopyCompletedEvent : public Event { +public: + CopyCompletedEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) + : Event(EventType::COPY_COMPLETED, std::move(resource)), + task_table_item_(std::move(task_table_item)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event); + +public: + TaskTableItemPtr task_table_item_; +}; + +} +} +} diff --git a/cpp/src/scheduler/event/Event.h b/cpp/src/scheduler/event/Event.h new file mode 100644 index 0000000000..788cfd6a73 --- /dev/null +++ b/cpp/src/scheduler/event/Event.h @@ -0,0 +1,51 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class EventType { + START_UP, + COPY_COMPLETED, + FINISH_TASK, + TASK_TABLE_UPDATED +}; + +class Resource; + +class Event { +public: + explicit + Event(EventType type, std::weak_ptr resource) + : type_(type), + resource_(std::move(resource)) {} + + inline EventType + Type() const { + return type_; + } + + inline virtual std::string + Dump() const { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const Event &event); + +public: + EventType type_; + std::weak_ptr resource_; +}; + +using EventPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/event/EventDump.cpp b/cpp/src/scheduler/event/EventDump.cpp new file mode 100644 index 0000000000..0d10f6f7b4 --- /dev/null +++ b/cpp/src/scheduler/event/EventDump.cpp @@ -0,0 +1,45 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Event.h" +#include "StartUpEvent.h" +#include "CopyCompletedEvent.h" +#include "FinishTaskEvent.h" +#include "TaskTableUpdatedEvent.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +std::ostream &operator<<(std::ostream &out, const Event &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) { + out << event.Dump(); + return out; +} + +} +} +} diff --git a/cpp/src/scheduler/event/FinishTaskEvent.h b/cpp/src/scheduler/event/FinishTaskEvent.h new file mode 100644 index 0000000000..14daa9b532 --- /dev/null +++ b/cpp/src/scheduler/event/FinishTaskEvent.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class FinishTaskEvent : public Event { +public: + FinishTaskEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) + : Event(EventType::FINISH_TASK, std::move(resource)), + task_table_item_(std::move(task_table_item)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event); + +public: + TaskTableItemPtr task_table_item_; +}; + +} +} +} diff --git a/cpp/src/scheduler/event/StartUpEvent.h b/cpp/src/scheduler/event/StartUpEvent.h new file mode 100644 index 0000000000..4b5ec78cd6 --- /dev/null +++ b/cpp/src/scheduler/event/StartUpEvent.h @@ -0,0 +1,31 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class StartUpEvent : public Event { +public: + explicit + StartUpEvent(std::weak_ptr resource) + : Event(EventType::START_UP, std::move(resource)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event); +}; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h new file mode 100644 index 0000000000..f96c30674c --- /dev/null +++ b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h @@ -0,0 +1,32 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class TaskTableUpdatedEvent : public Event { +public: + explicit + TaskTableUpdatedEvent(std::weak_ptr resource) + : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event); +}; + + +} +} +} diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index 32eb627046..01fca35ee4 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -11,26 +11,20 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const CpuResource &resource) { + out << resource.Dump(); + return out; +} CpuResource::CpuResource(std::string name) : Resource(std::move(name), ResourceType::CPU) {} void CpuResource::LoadFile(TaskPtr task) { - //if (src.type == DISK) { - // fd = open(filename); - // content = fd.read(); - // close(fd); - //} else if (src.type == CPU) { - // memcpy(src, dest, len); - //} else if (src.type == GPU) { - // cudaMemcpyD2H(src, dest); - //} else { - // // unknown type, exception - //} + task->Load(LoadType::DISK2CPU, 0); } void CpuResource::Process(TaskPtr task) { - + task->Execute(); } } diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index be1340e954..c180ea07d7 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -19,6 +19,13 @@ public: explicit CpuResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp index dcc0687ac4..8612909e44 100644 --- a/cpp/src/scheduler/resource/DiskResource.cpp +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -10,9 +10,14 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const DiskResource &resource) { + out << resource.Dump(); + return out; +} DiskResource::DiskResource(std::string name) - : Resource(std::move(name), ResourceType::DISK) {} + : Resource(std::move(name), ResourceType::DISK, true, false) { +} void DiskResource::LoadFile(TaskPtr task) { diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 39211dbb66..0fb9d625aa 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -18,6 +18,13 @@ public: explicit DiskResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp index 00d5df05b4..8606bb7856 100644 --- a/cpp/src/scheduler/resource/GpuResource.cpp +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -11,16 +11,20 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const GpuResource &resource) { + out << resource.Dump(); + return out; +} GpuResource::GpuResource(std::string name) : Resource(std::move(name), ResourceType::GPU) {} void GpuResource::LoadFile(TaskPtr task) { - + task->Load(LoadType::CPU2GPU, 0); } void GpuResource::Process(TaskPtr task) { - + task->Execute(); } } diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 84bf163284..1cb38df34f 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -18,6 +18,13 @@ public: explicit GpuResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/Node.cpp b/cpp/src/scheduler/resource/Node.cpp index 76d61fe858..8e3db29ea2 100644 --- a/cpp/src/scheduler/resource/Node.cpp +++ b/cpp/src/scheduler/resource/Node.cpp @@ -55,8 +55,7 @@ std::string Node::Dump() { void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { std::lock_guard lk(mutex_); if (auto s = neighbour_node.lock()) { - Neighbour neighbour(neighbour_node, connection); - neighbours_[s->id_] = neighbour; + neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection))); } // else do nothing, consider it.. } diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 7f59256474..2c46a703c6 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -10,54 +10,85 @@ namespace zilliz { namespace milvus { namespace engine { -Resource::Resource(std::string name, ResourceType type) +std::ostream &operator<<(std::ostream &out, const Resource &resource) { + out << resource.Dump(); + return out; +} + +Resource::Resource(std::string name, + ResourceType type, + bool enable_loader, + bool enable_executor) : name_(std::move(name)), type_(type), running_(false), + enable_loader_(enable_loader), + enable_executor_(enable_executor), load_flag_(false), exec_flag_(false) { + task_table_.RegisterSubscriber([&] { + if (subscriber_) { + auto event = std::make_shared(shared_from_this()); + subscriber_(std::static_pointer_cast(event)); + } + }); } void Resource::Start() { - loader_thread_ = std::thread(&Resource::loader_function, this); - executor_thread_ = std::thread(&Resource::executor_function, this); + running_ = true; + if (enable_loader_) { + loader_thread_ = std::thread(&Resource::loader_function, this); + } + if (enable_executor_) { + executor_thread_ = std::thread(&Resource::executor_function, this); + } } void Resource::Stop() { running_ = false; - WakeupLoader(); - WakeupExecutor(); + if (enable_loader_) { + WakeupLoader(); + loader_thread_.join(); + } + if (enable_executor_) { + WakeupExecutor(); + executor_thread_.join(); + } } TaskTable &Resource::task_table() { return task_table_; } -void Resource::WakeupExecutor() { - exec_cv_.notify_one(); -} - void Resource::WakeupLoader() { + std::lock_guard lock(load_mutex_); + load_flag_ = true; load_cv_.notify_one(); } -TaskPtr Resource::pick_task_load() { +void Resource::WakeupExecutor() { + std::lock_guard lock(exec_mutex_); + exec_flag_ = true; + exec_cv_.notify_one(); +} + +TaskTableItemPtr Resource::pick_task_load() { auto indexes = PickToLoad(task_table_, 3); for (auto index : indexes) { // try to set one task loading, then return if (task_table_.Load(index)) - return task_table_.Get(index).task; + return task_table_.Get(index); // else try next } return nullptr; } -TaskPtr Resource::pick_task_execute() { +TaskTableItemPtr Resource::pick_task_execute() { auto indexes = PickToExecute(task_table_, 3); for (auto index : indexes) { // try to set one task executing, then return if (task_table_.Execute(index)) - return task_table_.Get(index).task; + return task_table_.Get(index); // else try next } return nullptr; @@ -67,24 +98,46 @@ void Resource::loader_function() { while (running_) { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); - auto task = pick_task_load(); - if (task) { - LoadFile(task); - GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec(); + load_flag_ = false; + while (true) { + auto task_item = pick_task_load(); + if (task_item == nullptr) { + break; + } + LoadFile(task_item->task); + // TODO: wrapper loaded + task_item->state = TaskTableItemState::LOADED; + if (subscriber_) { + auto event = std::make_shared(shared_from_this(), task_item); + subscriber_(std::static_pointer_cast(event)); + } } + } } void Resource::executor_function() { - GetRegisterFunc(RegisterType::START_UP)->Exec(); + if (subscriber_) { + auto event = std::make_shared(shared_from_this()); + subscriber_(std::static_pointer_cast(event)); + } while (running_) { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); - auto task = pick_task_execute(); - if (task) { - Process(task); - GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec(); + exec_flag_ = false; + while (true) { + auto task_item = pick_task_execute(); + if (task_item == nullptr) { + break; + } + Process(task_item->task); + task_item->state = TaskTableItemState::EXECUTED; + if (subscriber_) { + auto event = std::make_shared(shared_from_this(), task_item); + subscriber_(std::static_pointer_cast(event)); + } } + } } diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 2961e281fa..c32149b46e 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -12,6 +12,11 @@ #include #include +#include "../event/Event.h" +#include "../event/StartUpEvent.h" +#include "../event/CopyCompletedEvent.h" +#include "../event/FinishTaskEvent.h" +#include "../event/TaskTableUpdatedEvent.h" #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" @@ -37,18 +42,28 @@ enum class RegisterType { ON_TASK_TABLE_UPDATED, }; -class Resource : public Node { +class Resource : public Node, public std::enable_shared_from_this { public: /* * Event function MUST be a short function, never blocking; */ - template - void Register_T(const RegisterType& type) { + template + void Register_T(const RegisterType &type) { register_table_.emplace(type, [] { return std::make_shared(); }); } RegisterHandlerPtr - GetRegisterFunc(const RegisterType& type); + GetRegisterFunc(const RegisterType &type); + + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + + inline ResourceType + Type() const { + return type_; + } void Start(); @@ -59,21 +74,31 @@ public: TaskTable & task_table(); + inline virtual std::string + Dump() const { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const Resource &resource); + public: + /* + * wake up loader; + */ + void + WakeupLoader(); + /* * wake up executor; */ void WakeupExecutor(); - /* - * wake up loader; - */ - void - WakeupLoader(); - protected: - Resource(std::string name, ResourceType type); + Resource(std::string name, + ResourceType type, + bool enable_loader = true, + bool enable_executor = true); // TODO: SearchContextPtr to TaskPtr /* @@ -100,14 +125,14 @@ private: * Pick one task to load; * Order by start time; */ - TaskPtr + TaskTableItemPtr pick_task_load(); /* * Pick one task to execute; * Pick by start time and priority; */ - TaskPtr + TaskTableItemPtr pick_task_execute(); private: @@ -123,7 +148,6 @@ private: void executor_function(); - private: std::string name_; ResourceType type_; @@ -131,8 +155,11 @@ private: TaskTable task_table_; std::map> register_table_; + std::function subscriber_ = nullptr; bool running_; + bool enable_loader_ = true; + bool enable_executor_ = true; std::thread loader_thread_; std::thread executor_thread_; diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp new file mode 100644 index 0000000000..a974482e52 --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "TestTask.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +void +TestTask::Load(LoadType type, uint8_t device_id) { + load_count_++; +} + +void +TestTask::Execute() { + exec_count_++; +} + +} +} +} + diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h new file mode 100644 index 0000000000..5f49d2e31e --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Task.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class TestTask : public Task { +public: + TestTask() = default; + +public: + void + Load(LoadType type, uint8_t device_id) override; + + void + Execute() override; + +public: + uint64_t load_count_; + uint64_t exec_count_; +}; + + +} +} +} diff --git a/cpp/src/wrapper/knowhere/vec_impl.cpp b/cpp/src/wrapper/knowhere/vec_impl.cpp index d1cc1ae4ff..f6bdd82618 100644 --- a/cpp/src/wrapper/knowhere/vec_impl.cpp +++ b/cpp/src/wrapper/knowhere/vec_impl.cpp @@ -8,6 +8,7 @@ #include "knowhere/index/vector_index/idmap.h" #include "knowhere/index/vector_index/gpu_ivf.h" #include "knowhere/common/exception.h" +#include "knowhere/index/vector_index/cloner.h" #include "vec_impl.h" #include "data_transfer.h" @@ -152,6 +153,22 @@ VecIndexPtr VecIndexImpl::CopyToCpu(const Config &cfg) { return std::make_shared(cpu_index, type); } +VecIndexPtr VecIndexImpl::Clone() { + auto clone_index = std::make_shared(index_->Clone(), type); + clone_index->dim = dim; + return clone_index; +} + +int64_t VecIndexImpl::GetDeviceId() { + if (auto device_idx = std::dynamic_pointer_cast(index_)){ + return device_idx->GetGpuDevice(); + } + else { + return -1; // -1 == cpu + } + return 0; +} + float *BFIndex::GetRawVectors() { auto raw_index = std::dynamic_pointer_cast(index_); if (raw_index) { return raw_index->GetRawVectors(); } diff --git a/cpp/src/wrapper/knowhere/vec_impl.h b/cpp/src/wrapper/knowhere/vec_impl.h index 5e46c16f70..f03f299f78 100644 --- a/cpp/src/wrapper/knowhere/vec_impl.h +++ b/cpp/src/wrapper/knowhere/vec_impl.h @@ -33,6 +33,8 @@ class VecIndexImpl : public VecIndex { server::KnowhereError Add(const long &nb, const float *xb, const long *ids, const Config &cfg) override; zilliz::knowhere::BinarySet Serialize() override; server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) override; + VecIndexPtr Clone() override; + int64_t GetDeviceId() override; server::KnowhereError Search(const long &nq, const float *xq, float *dist, long *ids, const Config &cfg) override; protected: diff --git a/cpp/src/wrapper/knowhere/vec_index.h b/cpp/src/wrapper/knowhere/vec_index.h index 088228386c..19f0c6d360 100644 --- a/cpp/src/wrapper/knowhere/vec_index.h +++ b/cpp/src/wrapper/knowhere/vec_index.h @@ -63,6 +63,10 @@ class VecIndex { virtual VecIndexPtr CopyToCpu(const Config &cfg = Config()) = 0; + virtual VecIndexPtr Clone() = 0; + + virtual int64_t GetDeviceId() = 0; + virtual IndexType GetType() = 0; virtual int64_t Dimension() = 0; diff --git a/cpp/unittest/scheduler/CMakeLists.txt b/cpp/unittest/scheduler/CMakeLists.txt index c38f4736f7..24210cb84d 100644 --- a/cpp/unittest/scheduler/CMakeLists.txt +++ b/cpp/unittest/scheduler/CMakeLists.txt @@ -14,6 +14,8 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) @@ -35,6 +37,9 @@ include_directories(/usr/include/mysql) set(scheduler_test_src ${unittest_srcs} + ${test_srcs} + ${scheduler_action_srcs} + ${scheduler_event_srcs} ${scheduler_resource_srcs} ${scheduler_task_srcs} ${scheduler_srcs} diff --git a/cpp/unittest/scheduler/cost_test.cpp b/cpp/unittest/scheduler/cost_test.cpp new file mode 100644 index 0000000000..d4c05257d1 --- /dev/null +++ b/cpp/unittest/scheduler/cost_test.cpp @@ -0,0 +1,47 @@ +#include "scheduler/TaskTable.h" +#include "scheduler/Cost.h" +#include + + +using namespace zilliz::milvus::engine; + +class CostTest : public ::testing::Test { +protected: + void + SetUp() override { + for (uint64_t i = 0; i < 8; ++i) { + auto task = std::make_shared(); + table_.Put(task); + } + table_.Get(0)->state = TaskTableItemState::INVALID; + table_.Get(1)->state = TaskTableItemState::START; + table_.Get(2)->state = TaskTableItemState::LOADING; + table_.Get(3)->state = TaskTableItemState::LOADED; + table_.Get(4)->state = TaskTableItemState::EXECUTING; + table_.Get(5)->state = TaskTableItemState::EXECUTED; + table_.Get(6)->state = TaskTableItemState::MOVING; + table_.Get(7)->state = TaskTableItemState::MOVED; + } + + + TaskTable table_; +}; + +TEST_F(CostTest, pick_to_move) { + CacheMgr cache; + auto indexes = PickToMove(table_, cache, 10); + ASSERT_EQ(indexes.size(), 1); + ASSERT_EQ(indexes[0], 3); +} + +TEST_F(CostTest, pick_to_load) { + auto indexes = PickToLoad(table_, 10); + ASSERT_EQ(indexes.size(), 1); + ASSERT_EQ(indexes[0], 1); +} + +TEST_F(CostTest, pick_to_executed) { + auto indexes = PickToExecute(table_, 10); + ASSERT_EQ(indexes.size(), 1); + ASSERT_EQ(indexes[0], 3); +} diff --git a/cpp/unittest/scheduler/node_test.cpp b/cpp/unittest/scheduler/node_test.cpp index a2249e715b..f0621043db 100644 --- a/cpp/unittest/scheduler/node_test.cpp +++ b/cpp/unittest/scheduler/node_test.cpp @@ -10,6 +10,8 @@ protected: SetUp() override { node1_ = std::make_shared(); node2_ = std::make_shared(); + node3_ = std::make_shared(); + node4_ = std::make_shared(); auto pcie = Connection("PCIe", 11.0); diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index af5d1b91f8..4d1fa36de8 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -1,13 +1,14 @@ #include "scheduler/ResourceFactory.h" #include "scheduler/ResourceMgr.h" #include "scheduler/Scheduler.h" +#include "scheduler/task/TestTask.h" +#include "utils/Log.h" #include using namespace zilliz::milvus::engine; -TEST(normal_test, DISABLED_test1) { - +TEST(normal_test, test1) { // ResourceMgr only compose resources, provide unified event auto res_mgr = std::make_shared(); auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd")); @@ -23,17 +24,62 @@ TEST(normal_test, DISABLED_test1) { res_mgr->Start(); - auto task1 = std::make_shared(); - auto task2 = std::make_shared(); - if (auto observe = disk.lock()) { - observe->task_table().Put(task1); - observe->task_table().Put(task2); - observe->task_table().Put(task1); - observe->task_table().Put(task1); - } - auto scheduler = new Scheduler(res_mgr); scheduler->Start(); - while (true) sleep(1); + auto task1 = std::make_shared(); + auto task2 = std::make_shared(); + auto task3 = std::make_shared(); + auto task4 = std::make_shared(); + if (auto observe = disk.lock()) { + observe->task_table().Put(task1); + observe->task_table().Put(task2); + observe->task_table().Put(task3); + observe->task_table().Put(task4); + } + +// if (auto disk_r = disk.lock()) { +// if (auto cpu_r = cpu.lock()) { +// if (auto gpu1_r = gpu1.lock()) { +// if (auto gpu2_r = gpu2.lock()) { +// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; +// std::cout << "cpu:" << std::endl; +// std::cout << cpu_r->task_table().Dump() << std::endl; +// std::cout << "gpu1:" << std::endl; +// std::cout << gpu1_r->task_table().Dump() << std::endl; +// std::cout << "gpu2:" << std::endl; +// std::cout << gpu2_r->task_table().Dump() << std::endl; +// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl; +// } +// } +// } +// } + + sleep(5); + +// if (auto disk_r = disk.lock()) { +// if (auto cpu_r = cpu.lock()) { +// if (auto gpu1_r = gpu1.lock()) { +// if (auto gpu2_r = gpu2.lock()) { +// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; +// std::cout << "cpu:" << std::endl; +// std::cout << cpu_r->task_table().Dump() << std::endl; +// std::cout << "gpu1:" << std::endl; +// std::cout << gpu1_r->task_table().Dump() << std::endl; +// std::cout << "gpu2:" << std::endl; +// std::cout << gpu2_r->task_table().Dump() << std::endl; +// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl; +// } +// } +// } +// } + scheduler->Stop(); + res_mgr->Stop(); + + ASSERT_EQ(task1->load_count_, 1); + ASSERT_EQ(task1->exec_count_, 1); } diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp new file mode 100644 index 0000000000..2f7d58eb57 --- /dev/null +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -0,0 +1,144 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "scheduler/resource/Resource.h" +#include "scheduler/resource/DiskResource.h" +#include "scheduler/resource/CpuResource.h" +#include "scheduler/resource/GpuResource.h" +#include "scheduler/task/Task.h" +#include "scheduler/task/TestTask.h" +#include "scheduler/ResourceFactory.h" +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceTest : public testing::Test { +protected: + void + SetUp() override { + disk_resource_ = ResourceFactory::Create("disk"); + cpu_resource_ = ResourceFactory::Create("cpu"); + gpu_resource_ = ResourceFactory::Create("gpu"); + resources_.push_back(disk_resource_); + resources_.push_back(cpu_resource_); + resources_.push_back(gpu_resource_); + + auto subscriber = [&](EventPtr event) { + if (event->Type() == EventType::COPY_COMPLETED) { + std::lock_guard lock(load_mutex_); + ++load_count_; + cv_.notify_one(); + } + + if (event->Type() == EventType::FINISH_TASK) { + std::lock_guard lock(load_mutex_); + ++exec_count_; + cv_.notify_one(); + } + }; + + disk_resource_->RegisterSubscriber(subscriber); + cpu_resource_->RegisterSubscriber(subscriber); + gpu_resource_->RegisterSubscriber(subscriber); + + disk_resource_->Start(); + cpu_resource_->Start(); + gpu_resource_->Start(); + } + + void + TearDown() override { + disk_resource_->Stop(); + cpu_resource_->Stop(); + gpu_resource_->Stop(); + } + + void + WaitLoader(uint64_t count) { + std::unique_lock lock(load_mutex_); + cv_.wait(lock, [&] { return load_count_ == count; }); + } + + void + WaitExecutor(uint64_t count) { + std::unique_lock lock(exec_mutex_); + cv_.wait(lock, [&] { return exec_count_ == count; }); + } + + ResourcePtr disk_resource_; + ResourcePtr cpu_resource_; + ResourcePtr gpu_resource_; + std::vector resources_; + uint64_t load_count_ = 0; + uint64_t exec_count_ = 0; + std::mutex load_mutex_; + std::mutex exec_mutex_; + std::condition_variable cv_; +}; + +TEST_F(ResourceTest, cpu_resource_test) { + const uint64_t NUM = 100; + std::vector> tasks; + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(); + tasks.push_back(task); + cpu_resource_->task_table().Put(task); + } + + cpu_resource_->WakeupLoader(); + WaitLoader(NUM); +// std::cout << "after WakeupLoader" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + } + + cpu_resource_->WakeupExecutor(); + WaitExecutor(NUM); +// std::cout << "after WakeupExecutor" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->exec_count_, 1); + } +} + +TEST_F(ResourceTest, gpu_resource_test) { + const uint64_t NUM = 100; + std::vector> tasks; + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(); + tasks.push_back(task); + gpu_resource_->task_table().Put(task); + } + + gpu_resource_->WakeupLoader(); + WaitLoader(NUM); +// std::cout << "after WakeupLoader" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + } + + gpu_resource_->WakeupExecutor(); + WaitExecutor(NUM); +// std::cout << "after WakeupExecutor" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->exec_count_, 1); + } +} + + +} +} +} diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp new file mode 100644 index 0000000000..787ae59329 --- /dev/null +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -0,0 +1,17 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + + +} +} +} diff --git a/cpp/unittest/scheduler/tasktable_test.cpp b/cpp/unittest/scheduler/tasktable_test.cpp index 68255b8e93..f48db3af2d 100644 --- a/cpp/unittest/scheduler/tasktable_test.cpp +++ b/cpp/unittest/scheduler/tasktable_test.cpp @@ -45,8 +45,6 @@ protected: invalid_task_ = nullptr; task1_ = std::make_shared(); task2_ = std::make_shared(); - - empty_table_ = TaskTable(); } TaskPtr invalid_task_; @@ -58,19 +56,19 @@ protected: TEST_F(TaskTableBaseTest, put_task) { empty_table_.Put(task1_); - ASSERT_EQ(empty_table_.Get(0).task, task1_); + ASSERT_EQ(empty_table_.Get(0)->task, task1_); } TEST_F(TaskTableBaseTest, put_invalid_test) { empty_table_.Put(invalid_task_); - ASSERT_EQ(empty_table_.Get(0).task, invalid_task_); + ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_); } TEST_F(TaskTableBaseTest, put_batch) { std::vector tasks{task1_, task2_}; empty_table_.Put(tasks); - ASSERT_EQ(empty_table_.Get(0).task, task1_); - ASSERT_EQ(empty_table_.Get(1).task, task2_); + ASSERT_EQ(empty_table_.Get(0)->task, task1_); + ASSERT_EQ(empty_table_.Get(1)->task, task2_); } TEST_F(TaskTableBaseTest, put_empty_batch) { @@ -89,14 +87,14 @@ protected: table1_.Put(task); } - table1_.Get(0).state = TaskTableItemState::INVALID; - table1_.Get(1).state = TaskTableItemState::START; - table1_.Get(2).state = TaskTableItemState::LOADING; - table1_.Get(3).state = TaskTableItemState::LOADED; - table1_.Get(4).state = TaskTableItemState::EXECUTING; - table1_.Get(5).state = TaskTableItemState::EXECUTED; - table1_.Get(6).state = TaskTableItemState::MOVING; - table1_.Get(7).state = TaskTableItemState::MOVED; + table1_.Get(0)->state = TaskTableItemState::INVALID; + table1_.Get(1)->state = TaskTableItemState::START; + table1_.Get(2)->state = TaskTableItemState::LOADING; + table1_.Get(3)->state = TaskTableItemState::LOADED; + table1_.Get(4)->state = TaskTableItemState::EXECUTING; + table1_.Get(5)->state = TaskTableItemState::EXECUTED; + table1_.Get(6)->state = TaskTableItemState::MOVING; + table1_.Get(7)->state = TaskTableItemState::MOVED; } TaskTable table1_; @@ -106,22 +104,22 @@ TEST_F(TaskTableAdvanceTest, load) { table1_.Load(1); table1_.Loaded(2); - ASSERT_EQ(table1_.Get(1).state, TaskTableItemState::LOADING); - ASSERT_EQ(table1_.Get(2).state, TaskTableItemState::LOADED); + ASSERT_EQ(table1_.Get(1)->state, TaskTableItemState::LOADING); + ASSERT_EQ(table1_.Get(2)->state, TaskTableItemState::LOADED); } TEST_F(TaskTableAdvanceTest, execute) { table1_.Execute(3); table1_.Executed(4); - ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::EXECUTING); - ASSERT_EQ(table1_.Get(4).state, TaskTableItemState::EXECUTED); + ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::EXECUTING); + ASSERT_EQ(table1_.Get(4)->state, TaskTableItemState::EXECUTED); } TEST_F(TaskTableAdvanceTest, move) { table1_.Move(3); table1_.Moved(6); - ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::MOVING); - ASSERT_EQ(table1_.Get(6).state, TaskTableItemState::MOVED); + ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::MOVING); + ASSERT_EQ(table1_.Get(6)->state, TaskTableItemState::MOVED); } diff --git a/cpp/unittest/server/cache_test.cpp b/cpp/unittest/server/cache_test.cpp index df2a869e5c..4f1d1db4ef 100644 --- a/cpp/unittest/server/cache_test.cpp +++ b/cpp/unittest/server/cache_test.cpp @@ -39,6 +39,14 @@ public: } + engine::VecIndexPtr Clone() override { + return zilliz::milvus::engine::VecIndexPtr(); + } + + int64_t GetDeviceId() override { + return 0; + } + engine::IndexType GetType() override { return engine::IndexType::INVALID; }