diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 268940e8e3..2a9146dc07 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -57,6 +57,8 @@ Please mark all change in change log and use the ticket from JIRA. - MS-415 - Add command tasktable to dump all tasktables - MS-418 - Update server_config.template file, set CPU compute only default - MS-419 - Move index_file_size from IndexParam to TableSchema +- MS-421 - Add TaskLabel in scheduler +- MS-422 - Support DeleteTask in Multi-GpuResource case ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 96e2589382..b0af3890fc 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -54,7 +54,7 @@ ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connect auto res2 = get_resource_by_name(name2); if (res1 && res2) { res1->AddNeighbour(std::static_pointer_cast(res2), connection); - res2->AddNeighbour(std::static_pointer_cast(res1), connection); +// res2->AddNeighbour(std::static_pointer_cast(res1), connection); } } diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 85fa90585c..775cf76ba2 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -104,18 +104,29 @@ Scheduler::OnStartUp(const EventPtr &event) { void Scheduler::OnFinishTask(const EventPtr &event) { - if (auto resource = event->resource_.lock()) { - } } void Scheduler::OnCopyCompleted(const EventPtr &event) { + auto load_completed_event = std::static_pointer_cast(event); if (auto resource = event->resource_.lock()) { resource->WakeupExecutor(); - if (resource->Type() == ResourceType::DISK) { - Action::PushTaskToNeighbour(event->resource_); - } else { - Action::PushTaskToNeighbourHasExecutor(event->resource_); + + auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); + switch (task_table_type) { + case TaskLabelType::DEFAULT: { + if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { + Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource); + } + break; + } + case TaskLabelType::BROADCAST: { + Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); + break; + } + default: { + break; + } } } } diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h index 7150887185..8315ecb0ff 100644 --- a/cpp/src/scheduler/action/Action.h +++ b/cpp/src/scheduler/action/Action.h @@ -14,23 +14,11 @@ namespace engine { class Action { public: - /* - * Push task to neighbour; - */ static void - PushTaskToNeighbour(const ResourceWPtr &self); + PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self); - /* - * Push task to neighbour that has executor; - */ static void - PushTaskToNeighbourHasExecutor(const ResourceWPtr &self); - - /* - * Pull task From neighbour; - */ - static void - PullTaskFromNeighbour(const ResourceWPtr &self); + PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self); }; diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 9afeac688a..6b2ee44267 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -13,54 +13,9 @@ namespace zilliz { namespace milvus { namespace engine { -void -next(std::list &neighbours, std::list::iterator &it) { - it++; - if (neighbours.end() == it) { - it = neighbours.begin(); - } -} - -// TODO: this function called with only on tasks, so it will always push task to first neighbour -void -push_task_round_robin(TaskTable &self_task_table, std::list &neighbours) { - CacheMgr cache; - auto it = neighbours.begin(); - if (it == neighbours.end()) return; - auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); - - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; -// task = task->Clone(); - (*it)->task_table().Put(task); - next(neighbours, it); - } - } -} - -void -push_task_randomly(TaskTable &self_task_table, std::vector &neighbours) { - std::random_device rd; - std::mt19937 mt(rd()); - std::uniform_int_distribution dist(0, neighbours.size() - 1); - CacheMgr cache; - - auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; - neighbours[dist(mt)]->task_table().Put(task); - } - } -} - -void -Action::PushTaskToNeighbour(const ResourceWPtr &res) { - auto self = res.lock(); - if (not self) return; - - std::list neighbours; +std::vector +get_neighbours(const ResourcePtr &self) { + std::vector neighbours; for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -68,30 +23,27 @@ Action::PushTaskToNeighbour(const ResourceWPtr &res) { auto resource = std::static_pointer_cast(node); neighbours.emplace_back(resource); } + return neighbours; +} - push_task_round_robin(self->task_table(), neighbours); + +void +Action::PushTaskToNeighbourRandomly(const TaskPtr &task, + const ResourcePtr &self) { + auto neighbours = get_neighbours(self); + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0, neighbours.size() - 1); + + neighbours[dist(mt)]->task_table().Put(task); } void -Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) { - auto self = res.lock(); - if (not self) return; - - std::list l_neighbours; - std::vector v_neighbours; - for (auto &neighbour_node : self->GetNeighbours()) { - auto node = neighbour_node.neighbour_node.lock(); - if (not node) continue; - - auto resource = std::static_pointer_cast(node); - if (resource->HasExecutor()) { - l_neighbours.push_back(resource); - v_neighbours.push_back(resource); - } +Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { + auto neighbours = get_neighbours(self); + for (auto &neighbour : neighbours) { + neighbour->task_table().Put(task); } - -// push_task_round_robin(self->task_table(), l_neighbours); - push_task_randomly(self->task_table(), v_neighbours); } diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 5a6ae28cbc..e55f84dea2 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -29,6 +29,7 @@ namespace zilliz { namespace milvus { namespace engine { +// TODO(wxyu): Storage, Route, Executor enum class ResourceType { DISK = 0, CPU = 1, diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index bc2dae464b..31a1a88404 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -5,10 +5,12 @@ ******************************************************************************/ #pragma once +#include "db/scheduler/context/SearchContext.h" +#include "db/scheduler/task/IScheduleTask.h" +#include "scheduler/tasklabel/TaskLabel.h" + #include #include -#include -#include "src/db/scheduler/task/IScheduleTask.h" namespace zilliz { @@ -36,6 +38,21 @@ public: explicit Task(TaskType type) : type_(type) {} + /* + * Just Getter; + */ + inline TaskType + Type() const { return type_; } + + /* + * Getter and Setter; + */ + inline TaskLabelPtr & + label() { + return label_; + } + +public: virtual void Load(LoadType type, uint8_t device_id) = 0; @@ -46,13 +63,11 @@ public: virtual TaskPtr Clone() = 0; - inline TaskType - Type() const { return type_; } - public: std::vector search_contexts_; ScheduleTaskPtr task_; TaskType type_; + TaskLabelPtr label_ = nullptr; }; diff --git a/cpp/src/scheduler/task/TaskConvert.cpp b/cpp/src/scheduler/task/TaskConvert.cpp index 43f70903dc..30a3a38b26 100644 --- a/cpp/src/scheduler/task/TaskConvert.cpp +++ b/cpp/src/scheduler/task/TaskConvert.cpp @@ -5,6 +5,8 @@ ******************************************************************************/ #include "TaskConvert.h" +#include "scheduler/tasklabel/DefaultLabel.h" +#include "scheduler/tasklabel/BroadcastLabel.h" namespace zilliz { @@ -17,6 +19,7 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) { case ScheduleTaskType::kIndexLoad: { auto load_task = std::static_pointer_cast(schedule_task); auto task = std::make_shared(load_task->file_); + task->label() = std::make_shared(); task->search_contexts_ = load_task->search_contexts_; task->task_ = schedule_task; return task; @@ -24,6 +27,7 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) { case ScheduleTaskType::kDelete: { auto delete_task = std::static_pointer_cast(schedule_task); auto task = std::make_shared(delete_task->context_); + task->label() = std::make_shared(); return task; } default: { diff --git a/cpp/src/scheduler/tasklabel/BroadcastLabel.h b/cpp/src/scheduler/tasklabel/BroadcastLabel.h new file mode 100644 index 0000000000..406add5122 --- /dev/null +++ b/cpp/src/scheduler/tasklabel/BroadcastLabel.h @@ -0,0 +1,27 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "TaskLabel.h" + +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + + +class BroadcastLabel : public TaskLabel { +public: + BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) {} +}; + +using BroadcastLabelPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp b/cpp/src/scheduler/tasklabel/DefaultLabel.h similarity index 65% rename from cpp/src/scheduler/action/PullTaskFromNeighbour.cpp rename to cpp/src/scheduler/tasklabel/DefaultLabel.h index b1ac97b6e4..ada34cd679 100644 --- a/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp +++ b/cpp/src/scheduler/tasklabel/DefaultLabel.h @@ -3,19 +3,23 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#pragma once -#include "Action.h" +#include "TaskLabel.h" + +#include namespace zilliz { namespace milvus { namespace engine { -void -Action::PullTaskFromNeighbour(const ResourceWPtr &self) { - // TODO: implement -} +class DefaultLabel : public TaskLabel { +public: + DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {} +}; +using DefaultLabelPtr = std::shared_ptr; } } diff --git a/cpp/src/scheduler/tasklabel/SpecResLabel.h b/cpp/src/scheduler/tasklabel/SpecResLabel.h new file mode 100644 index 0000000000..9f69f5752f --- /dev/null +++ b/cpp/src/scheduler/tasklabel/SpecResLabel.h @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "TaskLabel.h" + +#include +#include + + +class Resource; + +using ResourceWPtr = std::weak_ptr; + +namespace zilliz { +namespace milvus { +namespace engine { + +class SpecResLabel : public TaskLabel { +public: + SpecResLabel(const ResourceWPtr &resource) + : TaskLabel(TaskLabelType::SPECIAL_RESOURCE), resource_(resource) {} + + inline ResourceWPtr & + resource() const { + return resource_; + } + + inline std::string & + resource_name() const { + return resource_name_; + } + +private: + ResourceWPtr resource_; + std::string resource_name_; +} + +using SpecResLabelPtr = std::make_shared; + +} +} +} + diff --git a/cpp/src/scheduler/tasklabel/TaskLabel.h b/cpp/src/scheduler/tasklabel/TaskLabel.h new file mode 100644 index 0000000000..3f39b8ec12 --- /dev/null +++ b/cpp/src/scheduler/tasklabel/TaskLabel.h @@ -0,0 +1,39 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class TaskLabelType { + DEFAULT, // means can be executed in any resource + SPECIAL_RESOURCE, // means must executing in special resource + BROADCAST, // means all enable-executor resource must execute task +}; + +class TaskLabel { +public: + inline TaskLabelType + Type() const { + return type_; + } + +protected: + TaskLabel(TaskLabelType type) : type_(type) {} + +private: + TaskLabelType type_; +}; + +using TaskLabelPtr = std::shared_ptr; + +} +} +} +