diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 000a012c0a..d0d353cfa7 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -74,6 +74,8 @@ Please mark all change in change log and use the ticket from JIRA. - MS-442 - Merge Knowhere - MS-445 - Rename CopyCompleted to LoadCompleted - MS-451 - Update server_config.template file, set GPU compute default +- MS-455 - Distribute tasks by minimal cost in scheduler +- MS-460 - Put transport speed as weight when choosing neighbour to execute task - MS-459 - Add cache for pick function in tasktable ## New Feature diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 7af3c54787..80758bfaad 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -54,7 +54,7 @@ set(grpc_service_files grpc/gen-milvus/milvus.pb.cc grpc/gen-status/status.grpc.pb.cc grpc/gen-status/status.pb.cc - ) + scheduler/Utils.h) set(db_files ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp new file mode 100644 index 0000000000..b861151ddf --- /dev/null +++ b/cpp/src/scheduler/Algorithm.cpp @@ -0,0 +1,98 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Algorithm.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +constexpr uint64_t MAXINT = 99999; + +uint64_t +ShortestPath(const ResourcePtr &src, + const ResourcePtr &dest, + const ResourceMgrPtr &res_mgr, + std::vector &path) { + + std::vector> paths; + + uint64_t num_of_resources = res_mgr->GetAllResouces().size(); + std::unordered_map id_name_map; + std::unordered_map name_id_map; + for (uint64_t i = 0; i < num_of_resources; ++i) { + id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name())); + name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i)); + } + + std::vector > dis_matrix; + dis_matrix.resize(num_of_resources); + for (uint64_t i = 0; i < num_of_resources; ++i) { + dis_matrix[i].resize(num_of_resources); + for (uint64_t j = 0; j < num_of_resources; ++j) { + dis_matrix[i][j] = MAXINT; + } + dis_matrix[i][i] = 0; + } + + std::vector vis(num_of_resources, false); + std::vector dis(num_of_resources, MAXINT); + for (auto &res : res_mgr->GetAllResouces()) { + + auto cur_node = std::static_pointer_cast(res); + auto cur_neighbours = cur_node->GetNeighbours(); + + for (auto &neighbour : cur_neighbours) { + auto neighbour_res = std::static_pointer_cast(neighbour.neighbour_node.lock()); + dis_matrix[name_id_map.at(res->Name())][name_id_map.at(neighbour_res->Name())] = + neighbour.connection.transport_cost(); + } + } + + for (uint64_t i = 0; i < num_of_resources; ++i) { + dis[i] = dis_matrix[name_id_map.at(src->Name())][i]; + } + + vis[name_id_map.at(src->Name())] = true; + std::vector parent(num_of_resources, -1); + + for (uint64_t i = 0; i < num_of_resources; ++i) { + uint64_t minn = MAXINT; + uint64_t temp = 0; + for (uint64_t j = 0; j < num_of_resources; ++j) { + if (!vis[j] && dis[j] < minn) { + minn = dis[j]; + temp = j; + } + } + vis[temp] = true; + + if (i == 0) { + parent[temp] = name_id_map.at(src->Name()); + } + + for (uint64_t j = 0; j < num_of_resources; ++j) { + if (!vis[j] && dis_matrix[temp][j] != MAXINT && dis_matrix[temp][j] + dis[temp] < dis[j]) { + dis[j] = dis_matrix[temp][j] + dis[temp]; + parent[j] = temp; + } + } + } + + int64_t parent_idx = parent[name_id_map.at(dest->Name())]; + if (parent_idx != -1) { + path.push_back(dest->Name()); + } + while (parent_idx != -1) { + path.push_back(id_name_map.at(parent_idx)); + parent_idx = parent[parent_idx]; + } + return dis[name_id_map.at(dest->Name())]; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/Algorithm.h b/cpp/src/scheduler/Algorithm.h new file mode 100644 index 0000000000..05d9ad71d8 --- /dev/null +++ b/cpp/src/scheduler/Algorithm.h @@ -0,0 +1,25 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "resource/Resource.h" +#include "ResourceMgr.h" + +#include +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +uint64_t +ShortestPath(const ResourcePtr &src, + const ResourcePtr &dest, + const ResourceMgrPtr &res_mgr, + std::vector& path); + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 649f840827..65373164e3 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -28,6 +28,17 @@ ResourceMgr::GetNumOfComputeResource() { return count; } +std::vector +ResourceMgr::GetComputeResource() { + std::vector result; + for (auto &resource : resources_) { + if (resource->HasExecutor()) { + result.emplace_back(resource); + } + } + return result; +} + uint64_t ResourceMgr::GetNumGpuResource() const { uint64_t num = 0; @@ -49,6 +60,21 @@ ResourceMgr::GetResource(ResourceType type, uint64_t device_id) { return nullptr; } +ResourcePtr +ResourceMgr::GetResourceByName(std::string name) { + for (auto &resource : resources_) { + if (resource->Name() == name) { + return resource; + } + } + return nullptr; +} + +std::vector +ResourceMgr::GetAllResouces() { + return resources_; +} + ResourceWPtr ResourceMgr::Add(ResourcePtr &&resource) { ResourceWPtr ret(resource); diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 5083aa1b53..da8f34f87e 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -41,12 +41,21 @@ public: ResourcePtr GetResource(ResourceType type, uint64_t device_id); + ResourcePtr + GetResourceByName(std::string name); + + std::vector + GetAllResouces(); + /* * Return account of resource which enable executor; */ uint64_t GetNumOfComputeResource(); + std::vector + GetComputeResource(); + /* * Add resource into Resource Management; * Generate functions on events; diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 3ee8cbfdb6..43204f0946 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -43,14 +43,21 @@ StartSchedulerService() { knowhere::FaissGpuResourceMgr::GetInstance().InitResource(); - auto default_connection = Connection("default_connection", 500.0); - auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS); +// auto default_connection = Connection("default_connection", 500.0); + auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren(); for (auto &conn : connections) { - std::string delimiter = "==="; - std::string left = conn.substr(0, conn.find(delimiter)); - std::string right = conn.substr(conn.find(delimiter) + 3, conn.length()); + auto &connect_name = conn.first; + auto &connect_conf = conn.second; + auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS); + auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS); - ResMgrInst::GetInstance()->Connect(left, right, default_connection); + std::string delimiter = "==="; + std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter)); + std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3, + connect_endpoint.length()); + + auto connection = Connection(connect_name, connect_speed); + ResMgrInst::GetInstance()->Connect(left, right, connection); } ResMgrInst::GetInstance()->Start(); diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 20183a2876..fa67eef489 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -5,8 +5,10 @@ ******************************************************************************/ #include +#include "event/LoadCompletedEvent.h" #include "Scheduler.h" #include "action/Action.h" +#include "Algorithm.h" namespace zilliz { @@ -136,6 +138,54 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { } break; } + case TaskLabelType::SPECIFIED_RESOURCE: { + auto self = event->resource_.lock(); + auto task = load_completed_event->task_table_item_->task; + + // if this resource is disk, assign it to smallest cost resource + if (self->Type() == ResourceType::DISK) { + // step 1: calculate shortest path per resource, from disk to compute resource + auto compute_resources = res_mgr_.lock()->GetComputeResource(); + std::vector> paths; + std::vector transport_costs; + for (auto &res : compute_resources) { + std::vector path; + uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); + transport_costs.push_back(transport_cost); + paths.emplace_back(path); + } + + // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + uint64_t min_cost = std::numeric_limits::max(); + uint64_t min_cost_idx = 0; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } + uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + + transport_costs[i]; + if (min_cost > cost) { + min_cost = cost; + min_cost_idx = i; + } + } + + // step 3: set path in task + Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + task->path() = task_path; + } + + if(self->Name() == task->path().Last()) { + self->WakeupLoader(); + } else { + auto next_res_name = task->path().Next(); + auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name); + load_completed_event->task_table_item_->Move(); + next_res->task_table().Put(task); + } + break; + } case TaskLabelType::BROADCAST: { Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); break; diff --git a/cpp/src/scheduler/Utils.cpp b/cpp/src/scheduler/Utils.cpp new file mode 100644 index 0000000000..074c035e8e --- /dev/null +++ b/cpp/src/scheduler/Utils.cpp @@ -0,0 +1,25 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include "Utils.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +uint64_t +get_current_timestamp() +{ + std::chrono::time_point now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto millis = std::chrono::duration_cast(duration).count(); + return millis; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/Utils.h b/cpp/src/scheduler/Utils.h new file mode 100644 index 0000000000..7a5bf1874d --- /dev/null +++ b/cpp/src/scheduler/Utils.h @@ -0,0 +1,18 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +uint64_t +get_current_timestamp(); + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 1939cbc127..200f6214fe 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -28,17 +28,48 @@ get_neighbours(const ResourcePtr &self) { return neighbours; } +std::vector> +get_neighbours_with_connetion(const ResourcePtr &self) { + std::vector> neighbours; + for (auto &neighbour_node : self->GetNeighbours()) { + auto node = neighbour_node.neighbour_node.lock(); + if (not node) continue; + + auto resource = std::static_pointer_cast(node); +// if (not resource->HasExecutor()) continue; + Connection conn = neighbour_node.connection; + neighbours.emplace_back(std::make_pair(resource, conn)); + } + return neighbours; +} + void Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { - auto neighbours = get_neighbours(self); + auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { + std::vector speeds; + uint64_t total_speed = 0; + for (auto &neighbour : neighbours) { + uint64_t speed = neighbour.second.speed(); + speeds.emplace_back(speed); + total_speed += speed; + } + std::random_device rd; std::mt19937 mt(rd()); - std::uniform_int_distribution dist(0, neighbours.size() - 1); + std::uniform_int_distribution dist(0, total_speed); + uint64_t index = 0; + int64_t rd_speed = dist(mt); + for (uint64_t i = 0; i < speeds.size(); ++i) { + rd_speed -= speeds[i]; + if (rd_speed <= 0) { + neighbours[i].first->task_table().Put(task); + return; + } + } - neighbours[dist(mt)]->task_table().Put(task); } else { //TODO: process } diff --git a/cpp/src/scheduler/resource/Connection.h b/cpp/src/scheduler/resource/Connection.h index 0f1088e7fe..83c9cc529c 100644 --- a/cpp/src/scheduler/resource/Connection.h +++ b/cpp/src/scheduler/resource/Connection.h @@ -19,15 +19,20 @@ public: : name_(std::move(name)), speed_(speed) {} const std::string & - get_name() const { + name() const { return name_; } - const double - get_speed() const { + uint64_t + speed() const { return speed_; } + uint64_t + transport_cost() { + return 1024 / speed_; + } + public: std::string Dump() const { @@ -38,7 +43,7 @@ public: private: std::string name_; - double speed_; + uint64_t speed_; }; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index d743814699..b4a6cb5b66 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ #include +#include "../Utils.h" #include "Resource.h" @@ -138,7 +139,13 @@ void Resource::executor_function() { if (task_item == nullptr) { break; } + + auto start = get_current_timestamp(); Process(task_item->task); + auto finish = get_current_timestamp(); + ++total_task_; + total_cost_ += finish - start; + task_item->Executed(); if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 6c0f32689f..9169a67cf9 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -43,7 +43,7 @@ enum class RegisterType { }; class Resource : public Node, public std::enable_shared_from_this { -public: + public: /* * Start loader and executor if enable; */ @@ -68,7 +68,7 @@ public: void WakeupExecutor(); -public: + public: template void Register_T(const RegisterType &type) { register_table_.emplace(type, [] { return std::make_shared(); }); @@ -109,6 +109,27 @@ public: return enable_executor_; } + // TODO: const + uint64_t + NumOfTaskToExec() { + uint64_t count = 0; + for (auto &task : task_table_) { + if (task->state == TaskTableItemState::LOADED) ++count; + } + return count; + } + + // TODO: need double ? + inline uint64_t + TaskAvgCost() const { + return total_cost_ / total_task_; + } + + inline uint64_t + TotalTasks() const { + return total_task_; + } + TaskTable & task_table(); @@ -119,7 +140,7 @@ public: friend std::ostream &operator<<(std::ostream &out, const Resource &resource); -protected: + protected: Resource(std::string name, ResourceType type, uint64_t device_id, @@ -141,7 +162,7 @@ protected: virtual void Process(TaskPtr task) = 0; -private: + private: /* * These function should move to cost.h ??? * COST.H ??? @@ -161,7 +182,7 @@ private: TaskTableItemPtr pick_task_execute(); -private: + private: /* * Only called by load thread; */ @@ -174,14 +195,17 @@ private: void executor_function(); -protected: + protected: uint64_t device_id_; std::string name_; -private: + private: ResourceType type_; TaskTable task_table_; + uint64_t total_cost_ = 0; + uint64_t total_task_ = 0; + std::map> register_table_; std::function subscriber_ = nullptr; diff --git a/cpp/src/scheduler/task/Path.h b/cpp/src/scheduler/task/Path.h new file mode 100644 index 0000000000..388a7b9c82 --- /dev/null +++ b/cpp/src/scheduler/task/Path.h @@ -0,0 +1,68 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Path { + public: + Path() = default; + + Path(std::vector& path, uint64_t index) : path_(path), index_(index) {} + + void + push_back(const std::string &str) { + path_.push_back(str); + } + + std::vector + Dump() { + return path_; + } + + std::string + Next() { + if (index_ > 0 && !path_.empty()) { + --index_; + return path_[index_]; + } else { + return nullptr; + } + + } + + std::string + Last() { + if (!path_.empty()) { + return path_[0]; + } else { + return nullptr; + } + } + + public: + std::string & + operator[](uint64_t index) { + return path_[index]; + } + + std::vector::iterator begin() { return path_.begin(); } + std::vector::iterator end() { return path_.end(); } + + public: + std::vector path_; + uint64_t index_ = 0; +}; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index 31a1a88404..7431679e13 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -8,6 +8,7 @@ #include "db/scheduler/context/SearchContext.h" #include "db/scheduler/task/IScheduleTask.h" #include "scheduler/tasklabel/TaskLabel.h" +#include "Path.h" #include #include @@ -44,6 +45,14 @@ public: inline TaskType Type() const { return type_; } + /* + * Transport path; + */ + inline Path& + path() { + return task_path_; + } + /* * Getter and Setter; */ @@ -64,6 +73,7 @@ public: Clone() = 0; public: + Path task_path_; std::vector search_contexts_; ScheduleTaskPtr task_; TaskType type_; diff --git a/cpp/src/scheduler/tasklabel/SpecResLabel.h b/cpp/src/scheduler/tasklabel/SpecResLabel.h index 9f69f5752f..51468bf28b 100644 --- a/cpp/src/scheduler/tasklabel/SpecResLabel.h +++ b/cpp/src/scheduler/tasklabel/SpecResLabel.h @@ -22,24 +22,24 @@ namespace engine { class SpecResLabel : public TaskLabel { public: SpecResLabel(const ResourceWPtr &resource) - : TaskLabel(TaskLabelType::SPECIAL_RESOURCE), resource_(resource) {} + : TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {} inline ResourceWPtr & - resource() const { + resource() { return resource_; } inline std::string & - resource_name() const { + resource_name() { return resource_name_; } private: ResourceWPtr resource_; std::string resource_name_; -} +}; -using SpecResLabelPtr = std::make_shared; +using SpecResLabelPtr = std::shared_ptr(); } } diff --git a/cpp/src/scheduler/tasklabel/TaskLabel.h b/cpp/src/scheduler/tasklabel/TaskLabel.h index 3f39b8ec12..84fd5ee77b 100644 --- a/cpp/src/scheduler/tasklabel/TaskLabel.h +++ b/cpp/src/scheduler/tasklabel/TaskLabel.h @@ -13,7 +13,7 @@ namespace engine { enum class TaskLabelType { DEFAULT, // means can be executed in any resource - SPECIAL_RESOURCE, // means must executing in special resource + SPECIFIED_RESOURCE, // means must executing in special resource BROADCAST, // means all enable-executor resource must execute task }; diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index 06b6e45fce..8198d5a232 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -18,175 +18,173 @@ using namespace milvus; //#define SET_VECTOR_IDS; namespace { - std::string GetTableName(); +std::string GetTableName(); - const std::string TABLE_NAME = GetTableName(); - constexpr int64_t TABLE_DIMENSION = 512; - constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; - constexpr int64_t BATCH_ROW_COUNT = 1000000; - constexpr int64_t NQ = 100; - constexpr int64_t TOP_K = 10; - constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different - constexpr int64_t ADD_VECTOR_LOOP = 1; - constexpr int64_t SECONDS_EACH_HOUR = 3600; +const std::string TABLE_NAME = GetTableName(); +constexpr int64_t TABLE_DIMENSION = 512; +constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; +constexpr int64_t BATCH_ROW_COUNT = 100000; +constexpr int64_t NQ = 100; +constexpr int64_t TOP_K = 10; +constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different +constexpr int64_t ADD_VECTOR_LOOP = 1; +constexpr int64_t SECONDS_EACH_HOUR = 3600; #define BLOCK_SPLITER std::cout << "===========================================" << std::endl; - void PrintTableSchema(const TableSchema& tb_schema) { - BLOCK_SPLITER - std::cout << "Table name: " << tb_schema.table_name << std::endl; - std::cout << "Table dimension: " << tb_schema.dimension << std::endl; - BLOCK_SPLITER - } +void PrintTableSchema(const TableSchema& tb_schema) { + BLOCK_SPLITER + std::cout << "Table name: " << tb_schema.table_name << std::endl; + std::cout << "Table dimension: " << tb_schema.dimension << std::endl; + BLOCK_SPLITER +} - void PrintSearchResult(const std::vector>& search_record_array, - const std::vector& topk_query_result_array) { - BLOCK_SPLITER - std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl; +void PrintSearchResult(const std::vector>& search_record_array, + const std::vector& topk_query_result_array) { + BLOCK_SPLITER + std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl; - int32_t index = 0; - for(auto& result : topk_query_result_array) { - auto search_id = search_record_array[index].first; - index++; - std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id) - << " top " << std::to_string(result.query_result_arrays.size()) - << " search result:" << std::endl; - for(auto& item : result.query_result_arrays) { - std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance); - std::cout << std::endl; - } - } - - BLOCK_SPLITER - } - - std::string CurrentTime() { - time_t tt; - time( &tt ); - tt = tt + 8*SECONDS_EACH_HOUR; - tm* t= gmtime( &tt ); - - std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1) - + "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour) - + "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec); - - return str; - } - - std::string CurrentTmDate(int64_t offset_day = 0) { - time_t tt; - time( &tt ); - tt = tt + 8*SECONDS_EACH_HOUR; - tt = tt + 24*SECONDS_EACH_HOUR*offset_day; - tm* t= gmtime( &tt ); - - std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) - + "-" + std::to_string(t->tm_mday); - - return str; - } - - std::string GetTableName() { - static std::string s_id(CurrentTime()); - return "tbl_" + s_id; - } - - TableSchema BuildTableSchema() { - TableSchema tb_schema; - tb_schema.table_name = TABLE_NAME; - tb_schema.dimension = TABLE_DIMENSION; - tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE; - - return tb_schema; - } - - void BuildVectors(int64_t from, int64_t to, - std::vector& vector_record_array) { - if(to <= from){ - return; - } - - vector_record_array.clear(); - for (int64_t k = from; k < to; k++) { - RowRecord record; - record.data.resize(TABLE_DIMENSION); - for(int64_t i = 0; i < TABLE_DIMENSION; i++) { - record.data[i] = (float)(k%(i+1)); - } - - vector_record_array.emplace_back(record); + int32_t index = 0; + for(auto& result : topk_query_result_array) { + auto search_id = search_record_array[index].first; + index++; + std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id) + << " top " << std::to_string(result.query_result_arrays.size()) + << " search result:" << std::endl; + for(auto& item : result.query_result_arrays) { + std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance); + std::cout << std::endl; } } - void Sleep(int seconds) { - std::cout << "Waiting " << seconds << " seconds ..." << std::endl; - sleep(seconds); + BLOCK_SPLITER +} + +std::string CurrentTime() { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1) + + "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour) + + "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec); + + return str; +} + +std::string CurrentTmDate(int64_t offset_day = 0) { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tt = tt + 24*SECONDS_EACH_HOUR*offset_day; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) + + "-" + std::to_string(t->tm_mday); + + return str; +} + +std::string GetTableName() { + static std::string s_id(CurrentTime()); + return "tbl_" + s_id; +} + +TableSchema BuildTableSchema() { + TableSchema tb_schema; + tb_schema.table_name = TABLE_NAME; + tb_schema.dimension = TABLE_DIMENSION; + tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE; + + return tb_schema; +} + +void BuildVectors(int64_t from, int64_t to, + std::vector& vector_record_array) { + if(to <= from){ + return; } - class TimeRecorder { - public: - explicit TimeRecorder(const std::string& title) - : title_(title) { - start_ = std::chrono::system_clock::now(); + vector_record_array.clear(); + for (int64_t k = from; k < to; k++) { + RowRecord record; + record.data.resize(TABLE_DIMENSION); + for(int64_t i = 0; i < TABLE_DIMENSION; i++) { + record.data[i] = (float)(k%(i+1)); } - ~TimeRecorder() { - std::chrono::system_clock::time_point end = std::chrono::system_clock::now(); - long span = (std::chrono::duration_cast (end - start_)).count(); - std::cout << title_ << " totally cost: " << span << " ms" << std::endl; - } + vector_record_array.emplace_back(record); + } +} - private: - std::string title_; - std::chrono::system_clock::time_point start_; - }; +void Sleep(int seconds) { + std::cout << "Waiting " << seconds << " seconds ..." << std::endl; + sleep(seconds); +} - void CheckResult(const std::vector>& search_record_array, - const std::vector& topk_query_result_array) { - BLOCK_SPLITER - int64_t index = 0; - for(auto& result : topk_query_result_array) { - auto result_id = result.query_result_arrays[0].id; - auto search_id = search_record_array[index++].first; - if(result_id != search_id) { - std::cout << "The top 1 result is wrong: " << result_id - << " vs. " << search_id << std::endl; - } else { - std::cout << "Check result sucessfully" << std::endl; - } - } - BLOCK_SPLITER +class TimeRecorder { + public: + explicit TimeRecorder(const std::string& title) + : title_(title) { + start_ = std::chrono::system_clock::now(); } - void DoSearch(std::shared_ptr conn, - const std::vector>& search_record_array, - const std::string& phase_name) { - std::vector query_range_array; - Range rg; - rg.start_value = CurrentTmDate(); - rg.end_value = CurrentTmDate(1); - query_range_array.emplace_back(rg); - - std::vector record_array; - for(auto& pair : search_record_array) { - record_array.push_back(pair.second); - } - - auto start = std::chrono::high_resolution_clock::now(); - for (auto i = 0; i < 5; ++i) { - std::vector topk_query_result_array; - { - TimeRecorder rc(phase_name); - Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array); - std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; - } - } - auto finish = std::chrono::high_resolution_clock::now(); - std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast>(finish - start).count() << "s\n"; - -// PrintSearchResult(search_record_array, topk_query_result_array); -// CheckResult(search_record_array, topk_query_result_array); + ~TimeRecorder() { + std::chrono::system_clock::time_point end = std::chrono::system_clock::now(); + long span = (std::chrono::duration_cast (end - start_)).count(); + std::cout << title_ << " totally cost: " << span << " ms" << std::endl; } + + private: + std::string title_; + std::chrono::system_clock::time_point start_; +}; + +void CheckResult(const std::vector>& search_record_array, + const std::vector& topk_query_result_array) { + BLOCK_SPLITER + int64_t index = 0; + for(auto& result : topk_query_result_array) { + auto result_id = result.query_result_arrays[0].id; + auto search_id = search_record_array[index++].first; + if(result_id != search_id) { + std::cout << "The top 1 result is wrong: " << result_id + << " vs. " << search_id << std::endl; + } else { + std::cout << "Check result sucessfully" << std::endl; + } + } + BLOCK_SPLITER +} + +void DoSearch(std::shared_ptr conn, + const std::vector>& search_record_array, + const std::string& phase_name) { + std::vector query_range_array; + Range rg; + rg.start_value = CurrentTmDate(); + rg.end_value = CurrentTmDate(1); + query_range_array.emplace_back(rg); + + std::vector record_array; + for(auto& pair : search_record_array) { + record_array.push_back(pair.second); + } + + auto start = std::chrono::high_resolution_clock::now(); + std::vector topk_query_result_array; + { + TimeRecorder rc(phase_name); + Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array); + std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; + } + auto finish = std::chrono::high_resolution_clock::now(); + std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast>(finish - start).count() << "s\n"; + + PrintSearchResult(search_record_array, topk_query_result_array); + CheckResult(search_record_array, topk_query_result_array); +} } void @@ -216,9 +214,9 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::cout << "All tables: " << std::endl; for(auto& table : tables) { int64_t row_count = 0; -// conn->DropTable(table); - stat = conn->CountTable(table, row_count); - std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl; + conn->DropTable(table); +// stat = conn->CountTable(table, row_count); +// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl; } } @@ -273,7 +271,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { if(search_record_array.size() < NQ) { search_record_array.push_back( - std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET])); + std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET])); } } } @@ -345,4 +343,4 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::string status = conn->ServerStatus(); std::cout << "Server status after disconnect: " << status << std::endl; } -} \ No newline at end of file +} diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index f8f6deea98..4b4a80e8da 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -57,6 +57,8 @@ static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id"; static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader"; static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor"; static const char* CONFIG_RESOURCE_CONNECTIONS = "connections"; +static const char* CONFIG_SPEED_CONNECTIONS = "speed"; +static const char* CONFIG_ENDPOINT_CONNECTIONS = "connections"; class ServerConfig { diff --git a/cpp/unittest/scheduler/algorithm_test.cpp b/cpp/unittest/scheduler/algorithm_test.cpp new file mode 100644 index 0000000000..43e6c4eae2 --- /dev/null +++ b/cpp/unittest/scheduler/algorithm_test.cpp @@ -0,0 +1,99 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include + +#include "scheduler/resource/Resource.h" +#include "scheduler/ResourceMgr.h" +#include "scheduler/resource/CpuResource.h" +#include "scheduler/ResourceFactory.h" +#include "scheduler/Algorithm.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class AlgorithmTest : public testing::Test { + protected: + void + SetUp() override { + ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); + ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, true); + ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1); + ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2); + ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0); + ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1); + + res_mgr_ = std::make_shared(); + disk_ = res_mgr_->Add(std::move(disk)); + cpu_0_ = res_mgr_->Add(std::move(cpu0)); + cpu_1_ = res_mgr_->Add(std::move(cpu1)); + cpu_2_ = res_mgr_->Add(std::move(cpu2)); + gpu_0_ = res_mgr_->Add(std::move(gpu0)); + gpu_1_ = res_mgr_->Add(std::move(gpu1)); + auto IO = Connection("IO", 5.0); + auto PCIE = Connection("PCIE", 11.0); + res_mgr_->Connect("disk", "cpu0", IO); + res_mgr_->Connect("cpu0", "cpu1", IO); + res_mgr_->Connect("cpu1", "cpu2", IO); + res_mgr_->Connect("cpu0", "cpu2", IO); + res_mgr_->Connect("cpu1", "gpu0", PCIE); + res_mgr_->Connect("cpu2", "gpu1", PCIE); + } + + ResourceWPtr disk_; + ResourceWPtr cpu_0_; + ResourceWPtr cpu_1_; + ResourceWPtr cpu_2_; + ResourceWPtr gpu_0_; + ResourceWPtr gpu_1_; + ResourceMgrPtr res_mgr_; +}; + +TEST_F(AlgorithmTest, ShortestPath_test) { + std::vector sp; + uint64_t cost; + cost = ShortestPath(disk_.lock(), gpu_0_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(cpu_0_.lock(), gpu_0_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(disk_.lock(), disk_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(cpu_0_.lock(), disk_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(cpu_2_.lock(), gpu_0_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + +} + + +} +} +} \ No newline at end of file diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index fd6017fadd..d1e7114ccb 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -30,7 +30,7 @@ protected: resources_.push_back(gpu_resource_); auto subscriber = [&](EventPtr event) { - if (event->Type() == EventType::COPY_COMPLETED) { + if (event->Type() == EventType::LOAD_COMPLETED) { std::lock_guard lock(load_mutex_); ++load_count_; cv_.notify_one(); diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp index 5335dc8de6..e05d31e3e0 100644 --- a/cpp/unittest/scheduler/scheduler_test.cpp +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -13,6 +13,7 @@ #include "scheduler/resource/Resource.h" #include "utils/Error.h" #include "wrapper/knowhere/vec_index.h" +#include "scheduler/tasklabel/SpecResLabel.h" namespace zilliz { namespace milvus { @@ -122,9 +123,6 @@ protected: ResourceMgrPtr res_mgr_; std::shared_ptr scheduler_; - uint64_t load_count_ = 0; - std::mutex load_mutex_; - std::condition_variable cv_; }; void @@ -157,6 +155,74 @@ TEST_F(SchedulerTest, OnCopyCompleted) { ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); } +class SchedulerTest2 : public testing::Test { + protected: + void + SetUp() override { + ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); + ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false); + ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false); + ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false); + ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true); + ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true); + + res_mgr_ = std::make_shared(); + disk_ = res_mgr_->Add(std::move(disk)); + cpu_0_ = res_mgr_->Add(std::move(cpu0)); + cpu_1_ = res_mgr_->Add(std::move(cpu1)); + cpu_2_ = res_mgr_->Add(std::move(cpu2)); + gpu_0_ = res_mgr_->Add(std::move(gpu0)); + gpu_1_ = res_mgr_->Add(std::move(gpu1)); + auto IO = Connection("IO", 5.0); + auto PCIE1 = Connection("PCIE", 11.0); + auto PCIE2 = Connection("PCIE", 20.0); + res_mgr_->Connect("disk", "cpu0", IO); + res_mgr_->Connect("cpu0", "cpu1", IO); + res_mgr_->Connect("cpu1", "cpu2", IO); + res_mgr_->Connect("cpu0", "cpu2", IO); + res_mgr_->Connect("cpu1", "gpu0", PCIE1); + res_mgr_->Connect("cpu2", "gpu1", PCIE2); + + scheduler_ = std::make_shared(res_mgr_); + + res_mgr_->Start(); + scheduler_->Start(); + } + + void + TearDown() override { + scheduler_->Stop(); + res_mgr_->Stop(); + } + + ResourceWPtr disk_; + ResourceWPtr cpu_0_; + ResourceWPtr cpu_1_; + ResourceWPtr cpu_2_; + ResourceWPtr gpu_0_; + ResourceWPtr gpu_1_; + ResourceMgrPtr res_mgr_; + + std::shared_ptr scheduler_; +}; + + +TEST_F(SchedulerTest2, SpecifiedResourceTest) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy = std::make_shared(); + dummy->location_ = "location"; + + for (uint64_t i = 0; i < NUM; ++i) { + std::shared_ptr task = std::make_shared(dummy); + task->label() = std::make_shared(disk_); + tasks.push_back(task); + disk_.lock()->task_table().Put(task); + } + +// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); +} + } } }