From 6c62fb0962af2d86be9a98644a318a6dc6893578 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Fri, 30 Aug 2019 16:08:10 +0800 Subject: [PATCH 1/6] Add new API for PathChoice Former-commit-id: 324ae928a548266d891f18373dd78148b1e52499 --- cpp/CMakeLists.txt | 2 +- cpp/src/CMakeLists.txt | 2 +- cpp/src/scheduler/Algorithm.cpp | 24 +++++++++++++ cpp/src/scheduler/Algorithm.h | 21 ++++++++++++ cpp/src/scheduler/ResourceMgr.cpp | 5 +++ cpp/src/scheduler/ResourceMgr.h | 3 ++ cpp/src/scheduler/Scheduler.cpp | 34 +++++++++++++++++++ cpp/src/scheduler/Utils.cpp | 23 +++++++++++++ cpp/src/scheduler/Utils.h | 18 ++++++++++ cpp/src/scheduler/resource/Connection.h | 8 ++--- cpp/src/scheduler/resource/Resource.cpp | 7 ++++ cpp/src/scheduler/resource/Resource.h | 33 ++++++++++++++---- cpp/src/scheduler/task/Task.h | 9 +++++ .../examples/grpcsimple/src/ClientTest.cpp | 27 +++++++++++---- 14 files changed, 197 insertions(+), 19 deletions(-) create mode 100644 cpp/src/scheduler/Algorithm.cpp create mode 100644 cpp/src/scheduler/Algorithm.h create mode 100644 cpp/src/scheduler/Utils.cpp create mode 100644 cpp/src/scheduler/Utils.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 55da3c82a0..08898aacdf 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -34,7 +34,7 @@ else() endif() message(STATUS "Build type = ${BUILD_TYPE}") -#add_definitions(-DNEW_SCHEDULER) +add_definitions(-DNEW_SCHEDULER) project(milvus VERSION "${MILVUS_VERSION}") project(milvus_engine LANGUAGES CUDA CXX) diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 7af3c54787..80758bfaad 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -54,7 +54,7 @@ set(grpc_service_files grpc/gen-milvus/milvus.pb.cc grpc/gen-status/status.grpc.pb.cc grpc/gen-status/status.pb.cc - ) + scheduler/Utils.h) set(db_files ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp new file mode 100644 index 0000000000..d5a1e26beb --- /dev/null +++ b/cpp/src/scheduler/Algorithm.cpp @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Algorithm.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +std::vector +ShortestPath(const ResourcePtr &src, const ResourcePtr& dest) { + auto node = std::static_pointer_cast(src); + auto neighbours = node->GetNeighbours(); + for (auto &neighbour : neighbours) { + neighbour.connection.speed() + } +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/Algorithm.h b/cpp/src/scheduler/Algorithm.h new file mode 100644 index 0000000000..faf68824d4 --- /dev/null +++ b/cpp/src/scheduler/Algorithm.h @@ -0,0 +1,21 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "resource/Resource.h" + +#include +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +std::vector +ShortestPath(const ResourcePtr &src, const ResourcePtr& dest); + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 649f840827..8bcfe052da 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -28,6 +28,11 @@ ResourceMgr::GetNumOfComputeResource() { return count; } +std::vector +ResourceMgr::GetComputeResource() { + // TODO +} + uint64_t ResourceMgr::GetNumGpuResource() const { uint64_t num = 0; diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 5083aa1b53..16dc3b8346 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -47,6 +47,9 @@ public: uint64_t GetNumOfComputeResource(); + std::vector + GetComputeResource(); + /* * Add resource into Resource Management; * Generate functions on events; diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 4c7f0ce6f7..20f4308055 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -8,6 +8,7 @@ #include "Scheduler.h" #include "Cost.h" #include "action/Action.h" +#include "Algorithm.h" namespace zilliz { @@ -137,6 +138,39 @@ Scheduler::OnCopyCompleted(const EventPtr &event) { } break; } + case TaskLabelType::SPECIAL_RESOURCE: { + auto self = event->resource_.lock(); + // if this resource is disk, assign it to smallest cost resource + if (self->Type() == ResourceType::DISK) { + // step 1: + // calculate shortest path per resource, from disk to compute resource + // calculate by transport_cost + auto compute_resources = res_mgr_.lock()->GetComputeResource(); + std::vector> paths; + for (auto res : compute_resources) { + std::vector path = ShortestPath(self, res); + paths.emplace_back(path); + } + + // step 2: + // select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + std::vector costs; + for (auto res : compute_resources) { + uint64_t cost = res->TaskAvgCost() * res->NumOfTaskToExec() + transport_cost; + costs.emplace_back(cost); + } + + path, cost + + // step 3: + // set path in task + } + + // do or move + auto load_event = std::static_pointer_cast(event); + auto path = (load_event->task_table_item_->task->Path); + break; + } case TaskLabelType::BROADCAST: { Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); break; diff --git a/cpp/src/scheduler/Utils.cpp b/cpp/src/scheduler/Utils.cpp new file mode 100644 index 0000000000..2818baa707 --- /dev/null +++ b/cpp/src/scheduler/Utils.cpp @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Utils.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +uint64_t +get_now_timestamp(); { +std::chrono::time_point now = std::chrono::system_clock::now(); +auto duration = now.time_since_epoch(); +auto millis = std::chrono::duration_cast(duration).count(); +return millis; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/Utils.h b/cpp/src/scheduler/Utils.h new file mode 100644 index 0000000000..b31cbd1534 --- /dev/null +++ b/cpp/src/scheduler/Utils.h @@ -0,0 +1,18 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +uint64_t +get_now_timestamp(); + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Connection.h b/cpp/src/scheduler/resource/Connection.h index 0f1088e7fe..cdab7a61fc 100644 --- a/cpp/src/scheduler/resource/Connection.h +++ b/cpp/src/scheduler/resource/Connection.h @@ -19,12 +19,12 @@ public: : name_(std::move(name)), speed_(speed) {} const std::string & - get_name() const { + name() const { return name_; } - const double - get_speed() const { + uint64_t + speed() const { return speed_; } @@ -38,7 +38,7 @@ public: private: std::string name_; - double speed_; + uint64_t speed_; }; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 6789d00c89..5396ee282b 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ #include +#include "../Utils.h" #include "Resource.h" @@ -139,7 +140,13 @@ void Resource::executor_function() { if (task_item == nullptr) { break; } + + auto start = get_now_timestamp(); Process(task_item->task); + auto finish = get_now_timestamp(); + ++total_task_; + total_cost_ += finish - start; + task_item->Executed(); if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 76c0b007bf..d1367ff196 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -44,7 +44,7 @@ enum class RegisterType { }; class Resource : public Node, public std::enable_shared_from_this { -public: + public: /* * Start loader and executor if enable; */ @@ -69,7 +69,7 @@ public: void WakeupExecutor(); -public: + public: template void Register_T(const RegisterType &type) { register_table_.emplace(type, [] { return std::make_shared(); }); @@ -110,6 +110,22 @@ public: return enable_executor_; } + // TODO: const + uint64_t + NumOfTaskToExec() { + uint64_t count = 0; + for (auto &task : task_table_) { + if (task->state == TaskTableItemState::LOADED) ++count; + } + return count; + } + + // TODO: need double ? + inline uint64_t + TaskAvgCost() const { + return total_cost_ / total_task_; + } + TaskTable & task_table(); @@ -120,7 +136,7 @@ public: friend std::ostream &operator<<(std::ostream &out, const Resource &resource); -protected: + protected: Resource(std::string name, ResourceType type, uint64_t device_id, @@ -142,7 +158,7 @@ protected: virtual void Process(TaskPtr task) = 0; -private: + private: /* * These function should move to cost.h ??? * COST.H ??? @@ -162,7 +178,7 @@ private: TaskTableItemPtr pick_task_execute(); -private: + private: /* * Only called by load thread; */ @@ -175,14 +191,17 @@ private: void executor_function(); -protected: + protected: uint64_t device_id_; std::string name_; -private: + private: ResourceType type_; TaskTable task_table_; + uint64_t total_cost_ = 0; + uint64_t total_task_ = 0; + std::map> register_table_; std::function subscriber_ = nullptr; diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index 31a1a88404..0a73b2384a 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -44,6 +44,14 @@ public: inline TaskType Type() const { return type_; } + /* + * Transport path; + */ + inline std::vector& + path() { + return path_; + } + /* * Getter and Setter; */ @@ -64,6 +72,7 @@ public: Clone() = 0; public: + std::vector path_; std::vector search_contexts_; ScheduleTaskPtr task_; TaskType type_; diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index 06b6e45fce..7e74b6f67f 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -24,8 +24,8 @@ namespace { constexpr int64_t TABLE_DIMENSION = 512; constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; constexpr int64_t BATCH_ROW_COUNT = 1000000; - constexpr int64_t NQ = 100; - constexpr int64_t TOP_K = 10; + constexpr int64_t NQ = 10; + constexpr int64_t TOP_K = 1000; constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different constexpr int64_t ADD_VECTOR_LOOP = 1; constexpr int64_t SECONDS_EACH_HOUR = 3600; @@ -177,14 +177,17 @@ namespace { std::vector topk_query_result_array; { TimeRecorder rc(phase_name); - Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array); + Status stat = conn->Search("zilliz_face", record_array, query_range_array, TOP_K, 10, topk_query_result_array); std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; } + if (i == 0) { + PrintSearchResult(search_record_array, topk_query_result_array); + } } auto finish = std::chrono::high_resolution_clock::now(); std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast>(finish - start).count() << "s\n"; -// PrintSearchResult(search_record_array, topk_query_result_array); + // CheckResult(search_record_array, topk_query_result_array); } } @@ -284,7 +287,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { int64_t row_count = 0; Status stat = conn->CountTable(TABLE_NAME, row_count); std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl; - DoSearch(conn, search_record_array, "Search without index"); +// DoSearch(conn, search_record_array, "Search without index"); } {//wait unit build index finish @@ -308,7 +311,19 @@ ClientTest::Test(const std::string& address, const std::string& port) { } {//search vectors after build index finish - DoSearch(conn, search_record_array, "Search after build index finish"); + std::vector> search_array; + std::vector row_record_array; + row_record_array.resize(NQ); + for (int64_t i = 0; i < NQ; ++i) { + row_record_array[i].data.resize(TABLE_DIMENSION); + for (auto j = 0; j < TABLE_DIMENSION; ++j) { + row_record_array[i].data[j] = 1; + } + search_array.push_back(std::make_pair(i, row_record_array[i])); + } + + DoSearch(conn, search_array, "Search after build index finish"); + // std::cout << conn->DumpTaskTables() << std::endl; } From 576e7f12a44052271fabc31e7f24f8ab53ec02e7 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 2 Sep 2019 10:54:29 +0800 Subject: [PATCH 2/6] MS-455 Distribute tasks by minimal cost in scheduler Former-commit-id: 11b0752a1b4d19b94fca51701ce2abaece2ef4b8 --- cpp/conf/server_config.template | 8 +- cpp/src/scheduler/Algorithm.cpp | 88 +++++++++++++++++-- cpp/src/scheduler/Algorithm.h | 8 +- cpp/src/scheduler/ResourceMgr.cpp | 23 ++++- cpp/src/scheduler/ResourceMgr.h | 6 ++ cpp/src/scheduler/Scheduler.cpp | 50 +++++++---- cpp/src/scheduler/Utils.cpp | 12 +-- cpp/src/scheduler/Utils.h | 2 +- cpp/src/scheduler/resource/Connection.h | 5 ++ cpp/src/scheduler/resource/Resource.cpp | 4 +- cpp/src/scheduler/resource/Resource.h | 4 +- cpp/src/scheduler/task/Path.h | 64 ++++++++++++++ cpp/src/scheduler/task/Task.h | 7 +- cpp/src/scheduler/tasklabel/SpecResLabel.h | 10 +-- cpp/src/scheduler/tasklabel/TaskLabel.h | 2 +- cpp/unittest/CMakeLists.txt | 2 +- cpp/unittest/scheduler/algorithm_test.cpp | 99 ++++++++++++++++++++++ cpp/unittest/scheduler/resource_test.cpp | 2 +- cpp/unittest/scheduler/scheduler_test.cpp | 72 +++++++++++++++- 19 files changed, 413 insertions(+), 55 deletions(-) create mode 100644 cpp/src/scheduler/task/Path.h create mode 100644 cpp/unittest/scheduler/algorithm_test.cpp diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 0e28737d99..386b61d1f1 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -70,15 +70,15 @@ resource_config: type: GPU memory: 6 device_id: 0 - enable_loader: true - enable_executor: true + enable_loader: false + enable_executor: false gtx1660: type: GPU memory: 6 device_id: 1 - enable_loader: true - enable_executor: true + enable_loader: false + enable_executor: false # connection list, length: 0~N # format: -${resource_name}===${resource_name} diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index d5a1e26beb..90e4927577 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -10,13 +10,89 @@ namespace zilliz { namespace milvus { namespace engine { -std::vector -ShortestPath(const ResourcePtr &src, const ResourcePtr& dest) { - auto node = std::static_pointer_cast(src); - auto neighbours = node->GetNeighbours(); - for (auto &neighbour : neighbours) { - neighbour.connection.speed() +constexpr uint64_t MAXINT = 99999; + +uint64_t +ShortestPath(const ResourcePtr &src, + const ResourcePtr &dest, + const ResourceMgrPtr &res_mgr, + std::vector &path) { + + std::vector> paths; + + uint64_t num_of_resources = res_mgr->GetAllResouces().size(); + uint64_t src_id, dest_id; + std::unordered_map id_name_map; + std::unordered_map name_id_map; + for (auto i = 0; i < num_of_resources; ++i) { + id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name())); + name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i)); } + + std::vector > dis_matrix; + dis_matrix.resize(num_of_resources); + for (auto i = 0; i < num_of_resources; ++i) { + dis_matrix[i].resize(num_of_resources); + for (auto j = 0; j < num_of_resources; ++j) { + dis_matrix[i][j] = MAXINT; + } + dis_matrix[i][i] = 0; + } + + std::vector vis(num_of_resources, false); + std::vector dis(num_of_resources, MAXINT); + for (auto &res : res_mgr->GetAllResouces()) { + + auto cur_node = std::static_pointer_cast(res); + auto cur_neighbours = cur_node->GetNeighbours(); + + for (auto &neighbour : cur_neighbours) { + auto neighbour_res = std::static_pointer_cast(neighbour.neighbour_node.lock()); + dis_matrix[name_id_map.at(res->Name())][name_id_map.at(neighbour_res->Name())] = + neighbour.connection.transport_cost(); + } + } + + for (uint64_t i = 0; i < num_of_resources; ++i) { + dis[i] = dis_matrix[name_id_map.at(src->Name())][i]; + } + + vis[name_id_map.at(src->Name())] = true; + std::vector parent(num_of_resources, -1); + + for (uint64_t i = 0; i < num_of_resources; ++i) { + uint64_t minn = MAXINT; + uint64_t temp; + for (auto j = 0; j < num_of_resources; ++j) { + if (!vis[j] && dis[j] < minn) { + minn = dis[j]; + temp = j; + } + } + vis[temp] = true; + + if (i == 0) { + parent[temp] = name_id_map.at(src->Name()); + } + + for (uint64_t j = 0; j < num_of_resources; ++j) { + if (!vis[j] && dis_matrix[temp][j] != MAXINT && dis_matrix[temp][j] + dis[temp] < dis[j]) { + dis[j] = dis_matrix[temp][j] + dis[temp]; + parent[j] = temp; + } + } + } + + int64_t parent_idx = parent[name_id_map.at(dest->Name())]; + if (parent_idx != -1) { + path.push_back(dest->Name()); + } + while (parent_idx != -1) { + path.push_back(id_name_map.at(parent_idx)); + parent_idx = parent[parent_idx]; + } +// result.push_back(id_name_map.at(parent_idx)); + return dis[name_id_map.at(dest->Name())]; } } diff --git a/cpp/src/scheduler/Algorithm.h b/cpp/src/scheduler/Algorithm.h index faf68824d4..05d9ad71d8 100644 --- a/cpp/src/scheduler/Algorithm.h +++ b/cpp/src/scheduler/Algorithm.h @@ -5,6 +5,7 @@ ******************************************************************************/ #include "resource/Resource.h" +#include "ResourceMgr.h" #include #include @@ -13,8 +14,11 @@ namespace zilliz { namespace milvus { namespace engine { -std::vector -ShortestPath(const ResourcePtr &src, const ResourcePtr& dest); +uint64_t +ShortestPath(const ResourcePtr &src, + const ResourcePtr &dest, + const ResourceMgrPtr &res_mgr, + std::vector& path); } } diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 8bcfe052da..65373164e3 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -30,7 +30,13 @@ ResourceMgr::GetNumOfComputeResource() { std::vector ResourceMgr::GetComputeResource() { - // TODO + std::vector result; + for (auto &resource : resources_) { + if (resource->HasExecutor()) { + result.emplace_back(resource); + } + } + return result; } uint64_t @@ -54,6 +60,21 @@ ResourceMgr::GetResource(ResourceType type, uint64_t device_id) { return nullptr; } +ResourcePtr +ResourceMgr::GetResourceByName(std::string name) { + for (auto &resource : resources_) { + if (resource->Name() == name) { + return resource; + } + } + return nullptr; +} + +std::vector +ResourceMgr::GetAllResouces() { + return resources_; +} + ResourceWPtr ResourceMgr::Add(ResourcePtr &&resource) { ResourceWPtr ret(resource); diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 16dc3b8346..da8f34f87e 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -41,6 +41,12 @@ public: ResourcePtr GetResource(ResourceType type, uint64_t device_id); + ResourcePtr + GetResourceByName(std::string name); + + std::vector + GetAllResouces(); + /* * Return account of resource which enable executor; */ diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 094b748ea3..8cd402500c 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -5,6 +5,7 @@ ******************************************************************************/ #include +#include "event/LoadCompletedEvent.h" #include "Scheduler.h" #include "Cost.h" #include "action/Action.h" @@ -138,37 +139,50 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { } break; } - case TaskLabelType::SPECIAL_RESOURCE: { + case TaskLabelType::SPECIFIED_RESOURCE: { auto self = event->resource_.lock(); + auto task = load_completed_event->task_table_item_->task; + // if this resource is disk, assign it to smallest cost resource if (self->Type() == ResourceType::DISK) { - // step 1: - // calculate shortest path per resource, from disk to compute resource - // calculate by transport_cost + // step 1: calculate shortest path per resource, from disk to compute resource auto compute_resources = res_mgr_.lock()->GetComputeResource(); std::vector> paths; + std::vector transport_costs; for (auto res : compute_resources) { - std::vector path = ShortestPath(self, res); + std::vector path; + uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); + transport_costs.push_back(transport_cost); paths.emplace_back(path); } - // step 2: - // select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - std::vector costs; - for (auto res : compute_resources) { - uint64_t cost = res->TaskAvgCost() * res->NumOfTaskToExec() + transport_cost; - costs.emplace_back(cost); + // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + std::vector costs; + uint64_t min_cost = std::numeric_limits::max(); + uint64_t min_cost_idx; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + + transport_costs[i]; + costs.push_back(cost); + if (min_cost > cost) { + min_cost = cost; + min_cost_idx = i; + } } - path, cost - - // step 3: - // set path in task + // step 3: set path in task + Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + task->path() = task_path; } - // do or move - auto load_event = std::static_pointer_cast(event); - auto path = (load_event->task_table_item_->task->Path); + if(self->Name() == task->path().Last()) { + self->WakeupLoader(); + } else { + auto next_res_name = task->path().Next(); + auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name); +// task->Move(); + next_res->task_table().Put(task); + } break; } case TaskLabelType::BROADCAST: { diff --git a/cpp/src/scheduler/Utils.cpp b/cpp/src/scheduler/Utils.cpp index 2818baa707..074c035e8e 100644 --- a/cpp/src/scheduler/Utils.cpp +++ b/cpp/src/scheduler/Utils.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "Utils.h" namespace zilliz { @@ -11,11 +12,12 @@ namespace milvus { namespace engine { uint64_t -get_now_timestamp(); { -std::chrono::time_point now = std::chrono::system_clock::now(); -auto duration = now.time_since_epoch(); -auto millis = std::chrono::duration_cast(duration).count(); -return millis; +get_current_timestamp() +{ + std::chrono::time_point now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto millis = std::chrono::duration_cast(duration).count(); + return millis; } } diff --git a/cpp/src/scheduler/Utils.h b/cpp/src/scheduler/Utils.h index b31cbd1534..7a5bf1874d 100644 --- a/cpp/src/scheduler/Utils.h +++ b/cpp/src/scheduler/Utils.h @@ -11,7 +11,7 @@ namespace milvus { namespace engine { uint64_t -get_now_timestamp(); +get_current_timestamp(); } } diff --git a/cpp/src/scheduler/resource/Connection.h b/cpp/src/scheduler/resource/Connection.h index cdab7a61fc..83c9cc529c 100644 --- a/cpp/src/scheduler/resource/Connection.h +++ b/cpp/src/scheduler/resource/Connection.h @@ -28,6 +28,11 @@ public: return speed_; } + uint64_t + transport_cost() { + return 1024 / speed_; + } + public: std::string Dump() const { diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 9fea7e5ed1..8d82d4821c 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -140,9 +140,9 @@ void Resource::executor_function() { break; } - auto start = get_now_timestamp(); + auto start = get_current_timestamp(); Process(task_item->task); - auto finish = get_now_timestamp(); + auto finish = get_current_timestamp(); ++total_task_; total_cost_ += finish - start; diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index b0ad414958..5996d07787 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -199,8 +199,8 @@ class Resource : public Node, public std::enable_shared_from_this { TaskTable task_table_; - uint64_t total_cost_ = 0; - uint64_t total_task_ = 0; + uint64_t total_cost_ = 10; + uint64_t total_task_ = 10; std::map> register_table_; std::function subscriber_ = nullptr; diff --git a/cpp/src/scheduler/task/Path.h b/cpp/src/scheduler/task/Path.h new file mode 100644 index 0000000000..8d5c6ee5d2 --- /dev/null +++ b/cpp/src/scheduler/task/Path.h @@ -0,0 +1,64 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Path { + public: + Path() = default; + + Path(std::vector& path, uint64_t index) : path_(path), index_(index) {} + + void + push_back(const std::string &str) { + path_.push_back(str); + } + + std::vector + Dump() { + return path_; + } + + std::string & + Next() { + --index_; + return path_[index_]; + } + + std::string & + Last() { + if (!path_.empty()) { + return path_[0]; + } else { + std::string str; + return str; + } + } + + public: + std::string & + operator[](uint64_t index) { + return path_[index]; + } + + std::vector::iterator begin() { return path_.begin(); } + std::vector::iterator end() { return path_.end(); } + + public: + std::vector path_; + uint64_t index_ = 0; +}; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index 0a73b2384a..7431679e13 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -8,6 +8,7 @@ #include "db/scheduler/context/SearchContext.h" #include "db/scheduler/task/IScheduleTask.h" #include "scheduler/tasklabel/TaskLabel.h" +#include "Path.h" #include #include @@ -47,9 +48,9 @@ public: /* * Transport path; */ - inline std::vector& + inline Path& path() { - return path_; + return task_path_; } /* @@ -72,7 +73,7 @@ public: Clone() = 0; public: - std::vector path_; + Path task_path_; std::vector search_contexts_; ScheduleTaskPtr task_; TaskType type_; diff --git a/cpp/src/scheduler/tasklabel/SpecResLabel.h b/cpp/src/scheduler/tasklabel/SpecResLabel.h index 9f69f5752f..51468bf28b 100644 --- a/cpp/src/scheduler/tasklabel/SpecResLabel.h +++ b/cpp/src/scheduler/tasklabel/SpecResLabel.h @@ -22,24 +22,24 @@ namespace engine { class SpecResLabel : public TaskLabel { public: SpecResLabel(const ResourceWPtr &resource) - : TaskLabel(TaskLabelType::SPECIAL_RESOURCE), resource_(resource) {} + : TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {} inline ResourceWPtr & - resource() const { + resource() { return resource_; } inline std::string & - resource_name() const { + resource_name() { return resource_name_; } private: ResourceWPtr resource_; std::string resource_name_; -} +}; -using SpecResLabelPtr = std::make_shared; +using SpecResLabelPtr = std::shared_ptr(); } } diff --git a/cpp/src/scheduler/tasklabel/TaskLabel.h b/cpp/src/scheduler/tasklabel/TaskLabel.h index 3f39b8ec12..84fd5ee77b 100644 --- a/cpp/src/scheduler/tasklabel/TaskLabel.h +++ b/cpp/src/scheduler/tasklabel/TaskLabel.h @@ -13,7 +13,7 @@ namespace engine { enum class TaskLabelType { DEFAULT, // means can be executed in any resource - SPECIAL_RESOURCE, // means must executing in special resource + SPECIFIED_RESOURCE, // means must executing in special resource BROADCAST, // means all enable-executor resource must execute task }; diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index ac666c86a9..287fe51128 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -42,5 +42,5 @@ add_subdirectory(server) add_subdirectory(db) add_subdirectory(knowhere) add_subdirectory(metrics) -#add_subdirectory(scheduler) +add_subdirectory(scheduler) #add_subdirectory(storage) \ No newline at end of file diff --git a/cpp/unittest/scheduler/algorithm_test.cpp b/cpp/unittest/scheduler/algorithm_test.cpp new file mode 100644 index 0000000000..43e6c4eae2 --- /dev/null +++ b/cpp/unittest/scheduler/algorithm_test.cpp @@ -0,0 +1,99 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include + +#include "scheduler/resource/Resource.h" +#include "scheduler/ResourceMgr.h" +#include "scheduler/resource/CpuResource.h" +#include "scheduler/ResourceFactory.h" +#include "scheduler/Algorithm.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class AlgorithmTest : public testing::Test { + protected: + void + SetUp() override { + ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); + ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, true); + ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1); + ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2); + ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0); + ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1); + + res_mgr_ = std::make_shared(); + disk_ = res_mgr_->Add(std::move(disk)); + cpu_0_ = res_mgr_->Add(std::move(cpu0)); + cpu_1_ = res_mgr_->Add(std::move(cpu1)); + cpu_2_ = res_mgr_->Add(std::move(cpu2)); + gpu_0_ = res_mgr_->Add(std::move(gpu0)); + gpu_1_ = res_mgr_->Add(std::move(gpu1)); + auto IO = Connection("IO", 5.0); + auto PCIE = Connection("PCIE", 11.0); + res_mgr_->Connect("disk", "cpu0", IO); + res_mgr_->Connect("cpu0", "cpu1", IO); + res_mgr_->Connect("cpu1", "cpu2", IO); + res_mgr_->Connect("cpu0", "cpu2", IO); + res_mgr_->Connect("cpu1", "gpu0", PCIE); + res_mgr_->Connect("cpu2", "gpu1", PCIE); + } + + ResourceWPtr disk_; + ResourceWPtr cpu_0_; + ResourceWPtr cpu_1_; + ResourceWPtr cpu_2_; + ResourceWPtr gpu_0_; + ResourceWPtr gpu_1_; + ResourceMgrPtr res_mgr_; +}; + +TEST_F(AlgorithmTest, ShortestPath_test) { + std::vector sp; + uint64_t cost; + cost = ShortestPath(disk_.lock(), gpu_0_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(cpu_0_.lock(), gpu_0_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(disk_.lock(), disk_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(cpu_0_.lock(), disk_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + std::cout << "************************************\n"; + cost = ShortestPath(cpu_2_.lock(), gpu_0_.lock(), res_mgr_, sp); + while (!sp.empty()) { + std::cout << sp[sp.size() - 1] << std::endl; + sp.pop_back(); + } + + +} + + +} +} +} \ No newline at end of file diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index fd6017fadd..d1e7114ccb 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -30,7 +30,7 @@ protected: resources_.push_back(gpu_resource_); auto subscriber = [&](EventPtr event) { - if (event->Type() == EventType::COPY_COMPLETED) { + if (event->Type() == EventType::LOAD_COMPLETED) { std::lock_guard lock(load_mutex_); ++load_count_; cv_.notify_one(); diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp index 5335dc8de6..e05d31e3e0 100644 --- a/cpp/unittest/scheduler/scheduler_test.cpp +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -13,6 +13,7 @@ #include "scheduler/resource/Resource.h" #include "utils/Error.h" #include "wrapper/knowhere/vec_index.h" +#include "scheduler/tasklabel/SpecResLabel.h" namespace zilliz { namespace milvus { @@ -122,9 +123,6 @@ protected: ResourceMgrPtr res_mgr_; std::shared_ptr scheduler_; - uint64_t load_count_ = 0; - std::mutex load_mutex_; - std::condition_variable cv_; }; void @@ -157,6 +155,74 @@ TEST_F(SchedulerTest, OnCopyCompleted) { ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); } +class SchedulerTest2 : public testing::Test { + protected: + void + SetUp() override { + ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); + ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false); + ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false); + ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false); + ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true); + ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true); + + res_mgr_ = std::make_shared(); + disk_ = res_mgr_->Add(std::move(disk)); + cpu_0_ = res_mgr_->Add(std::move(cpu0)); + cpu_1_ = res_mgr_->Add(std::move(cpu1)); + cpu_2_ = res_mgr_->Add(std::move(cpu2)); + gpu_0_ = res_mgr_->Add(std::move(gpu0)); + gpu_1_ = res_mgr_->Add(std::move(gpu1)); + auto IO = Connection("IO", 5.0); + auto PCIE1 = Connection("PCIE", 11.0); + auto PCIE2 = Connection("PCIE", 20.0); + res_mgr_->Connect("disk", "cpu0", IO); + res_mgr_->Connect("cpu0", "cpu1", IO); + res_mgr_->Connect("cpu1", "cpu2", IO); + res_mgr_->Connect("cpu0", "cpu2", IO); + res_mgr_->Connect("cpu1", "gpu0", PCIE1); + res_mgr_->Connect("cpu2", "gpu1", PCIE2); + + scheduler_ = std::make_shared(res_mgr_); + + res_mgr_->Start(); + scheduler_->Start(); + } + + void + TearDown() override { + scheduler_->Stop(); + res_mgr_->Stop(); + } + + ResourceWPtr disk_; + ResourceWPtr cpu_0_; + ResourceWPtr cpu_1_; + ResourceWPtr cpu_2_; + ResourceWPtr gpu_0_; + ResourceWPtr gpu_1_; + ResourceMgrPtr res_mgr_; + + std::shared_ptr scheduler_; +}; + + +TEST_F(SchedulerTest2, SpecifiedResourceTest) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy = std::make_shared(); + dummy->location_ = "location"; + + for (uint64_t i = 0; i < NUM; ++i) { + std::shared_ptr task = std::make_shared(dummy); + task->label() = std::make_shared(disk_); + tasks.push_back(task); + disk_.lock()->task_table().Put(task); + } + +// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); +} + } } } From ff283ab44cae9293154ee8848ec53f364c1df3d7 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 2 Sep 2019 15:47:01 +0800 Subject: [PATCH 3/6] fix multithread bugs in DBImpl Former-commit-id: e4f12f64235bf5b5a84e69cfe866192240705ada --- cpp/conf/server_config.template | 11 +- cpp/src/db/DBImpl.cpp | 4 +- cpp/src/scheduler/Algorithm.cpp | 12 +- cpp/src/scheduler/Scheduler.cpp | 14 +- cpp/src/scheduler/resource/Resource.h | 5 + cpp/src/scheduler/task/Path.h | 16 +- .../examples/grpcsimple/src/ClientTest.cpp | 329 +++++++++--------- cpp/unittest/CMakeLists.txt | 2 +- 8 files changed, 189 insertions(+), 204 deletions(-) diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index daf75459da..218dceed7a 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -70,15 +70,8 @@ resource_config: type: GPU memory: 6 device_id: 0 - enable_loader: false - enable_executor: false - - gtx1660: - type: GPU - memory: 6 - device_id: 1 - enable_loader: false - enable_executor: false + enable_loader: true + enable_executor: true # gtx1660: # type: GPU diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index b744899d56..6bf056e471 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -58,14 +58,14 @@ Status DBImpl::Start() { return Status::OK(); } + shutting_down_.store(false, std::memory_order_release); + //for distribute version, some nodes are read only if (options_.mode != Options::MODE::READ_ONLY) { ENGINE_LOG_TRACE << "StartTimerTasks"; bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); } - shutting_down_.store(false, std::memory_order_release); - return Status::OK(); } diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index 90e4927577..b861151ddf 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -21,19 +21,18 @@ ShortestPath(const ResourcePtr &src, std::vector> paths; uint64_t num_of_resources = res_mgr->GetAllResouces().size(); - uint64_t src_id, dest_id; std::unordered_map id_name_map; std::unordered_map name_id_map; - for (auto i = 0; i < num_of_resources; ++i) { + for (uint64_t i = 0; i < num_of_resources; ++i) { id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name())); name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i)); } std::vector > dis_matrix; dis_matrix.resize(num_of_resources); - for (auto i = 0; i < num_of_resources; ++i) { + for (uint64_t i = 0; i < num_of_resources; ++i) { dis_matrix[i].resize(num_of_resources); - for (auto j = 0; j < num_of_resources; ++j) { + for (uint64_t j = 0; j < num_of_resources; ++j) { dis_matrix[i][j] = MAXINT; } dis_matrix[i][i] = 0; @@ -62,8 +61,8 @@ ShortestPath(const ResourcePtr &src, for (uint64_t i = 0; i < num_of_resources; ++i) { uint64_t minn = MAXINT; - uint64_t temp; - for (auto j = 0; j < num_of_resources; ++j) { + uint64_t temp = 0; + for (uint64_t j = 0; j < num_of_resources; ++j) { if (!vis[j] && dis[j] < minn) { minn = dis[j]; temp = j; @@ -91,7 +90,6 @@ ShortestPath(const ResourcePtr &src, path.push_back(id_name_map.at(parent_idx)); parent_idx = parent[parent_idx]; } -// result.push_back(id_name_map.at(parent_idx)); return dis[name_id_map.at(dest->Name())]; } diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 8cd402500c..83c3e9864c 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -149,7 +149,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { auto compute_resources = res_mgr_.lock()->GetComputeResource(); std::vector> paths; std::vector transport_costs; - for (auto res : compute_resources) { + for (auto &res : compute_resources) { std::vector path; uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); transport_costs.push_back(transport_cost); @@ -157,13 +157,15 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { } // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - std::vector costs; uint64_t min_cost = std::numeric_limits::max(); - uint64_t min_cost_idx; + uint64_t min_cost_idx = 0; for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; - costs.push_back(cost); if (min_cost > cost) { min_cost = cost; min_cost_idx = i; @@ -174,13 +176,13 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); task->path() = task_path; } - // do or move + if(self->Name() == task->path().Last()) { self->WakeupLoader(); } else { auto next_res_name = task->path().Next(); auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name); -// task->Move(); + load_completed_event->task_table_item_->Move(); next_res->task_table().Put(task); } break; diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 5996d07787..f984cb4b46 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -126,6 +126,11 @@ class Resource : public Node, public std::enable_shared_from_this { return total_cost_ / total_task_; } + inline uint64_t + TotalTasks() const { + return total_task_; + } + TaskTable & task_table(); diff --git a/cpp/src/scheduler/task/Path.h b/cpp/src/scheduler/task/Path.h index 8d5c6ee5d2..388a7b9c82 100644 --- a/cpp/src/scheduler/task/Path.h +++ b/cpp/src/scheduler/task/Path.h @@ -29,19 +29,23 @@ class Path { return path_; } - std::string & + std::string Next() { - --index_; - return path_[index_]; + if (index_ > 0 && !path_.empty()) { + --index_; + return path_[index_]; + } else { + return nullptr; + } + } - std::string & + std::string Last() { if (!path_.empty()) { return path_[0]; } else { - std::string str; - return str; + return nullptr; } } diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index 7e74b6f67f..8198d5a232 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -18,178 +18,173 @@ using namespace milvus; //#define SET_VECTOR_IDS; namespace { - std::string GetTableName(); +std::string GetTableName(); - const std::string TABLE_NAME = GetTableName(); - constexpr int64_t TABLE_DIMENSION = 512; - constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; - constexpr int64_t BATCH_ROW_COUNT = 1000000; - constexpr int64_t NQ = 10; - constexpr int64_t TOP_K = 1000; - constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different - constexpr int64_t ADD_VECTOR_LOOP = 1; - constexpr int64_t SECONDS_EACH_HOUR = 3600; +const std::string TABLE_NAME = GetTableName(); +constexpr int64_t TABLE_DIMENSION = 512; +constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; +constexpr int64_t BATCH_ROW_COUNT = 100000; +constexpr int64_t NQ = 100; +constexpr int64_t TOP_K = 10; +constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different +constexpr int64_t ADD_VECTOR_LOOP = 1; +constexpr int64_t SECONDS_EACH_HOUR = 3600; #define BLOCK_SPLITER std::cout << "===========================================" << std::endl; - void PrintTableSchema(const TableSchema& tb_schema) { - BLOCK_SPLITER - std::cout << "Table name: " << tb_schema.table_name << std::endl; - std::cout << "Table dimension: " << tb_schema.dimension << std::endl; - BLOCK_SPLITER - } +void PrintTableSchema(const TableSchema& tb_schema) { + BLOCK_SPLITER + std::cout << "Table name: " << tb_schema.table_name << std::endl; + std::cout << "Table dimension: " << tb_schema.dimension << std::endl; + BLOCK_SPLITER +} - void PrintSearchResult(const std::vector>& search_record_array, - const std::vector& topk_query_result_array) { - BLOCK_SPLITER - std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl; +void PrintSearchResult(const std::vector>& search_record_array, + const std::vector& topk_query_result_array) { + BLOCK_SPLITER + std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl; - int32_t index = 0; - for(auto& result : topk_query_result_array) { - auto search_id = search_record_array[index].first; - index++; - std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id) - << " top " << std::to_string(result.query_result_arrays.size()) - << " search result:" << std::endl; - for(auto& item : result.query_result_arrays) { - std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance); - std::cout << std::endl; - } - } - - BLOCK_SPLITER - } - - std::string CurrentTime() { - time_t tt; - time( &tt ); - tt = tt + 8*SECONDS_EACH_HOUR; - tm* t= gmtime( &tt ); - - std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1) - + "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour) - + "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec); - - return str; - } - - std::string CurrentTmDate(int64_t offset_day = 0) { - time_t tt; - time( &tt ); - tt = tt + 8*SECONDS_EACH_HOUR; - tt = tt + 24*SECONDS_EACH_HOUR*offset_day; - tm* t= gmtime( &tt ); - - std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) - + "-" + std::to_string(t->tm_mday); - - return str; - } - - std::string GetTableName() { - static std::string s_id(CurrentTime()); - return "tbl_" + s_id; - } - - TableSchema BuildTableSchema() { - TableSchema tb_schema; - tb_schema.table_name = TABLE_NAME; - tb_schema.dimension = TABLE_DIMENSION; - tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE; - - return tb_schema; - } - - void BuildVectors(int64_t from, int64_t to, - std::vector& vector_record_array) { - if(to <= from){ - return; - } - - vector_record_array.clear(); - for (int64_t k = from; k < to; k++) { - RowRecord record; - record.data.resize(TABLE_DIMENSION); - for(int64_t i = 0; i < TABLE_DIMENSION; i++) { - record.data[i] = (float)(k%(i+1)); - } - - vector_record_array.emplace_back(record); + int32_t index = 0; + for(auto& result : topk_query_result_array) { + auto search_id = search_record_array[index].first; + index++; + std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id) + << " top " << std::to_string(result.query_result_arrays.size()) + << " search result:" << std::endl; + for(auto& item : result.query_result_arrays) { + std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance); + std::cout << std::endl; } } - void Sleep(int seconds) { - std::cout << "Waiting " << seconds << " seconds ..." << std::endl; - sleep(seconds); + BLOCK_SPLITER +} + +std::string CurrentTime() { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1) + + "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour) + + "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec); + + return str; +} + +std::string CurrentTmDate(int64_t offset_day = 0) { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tt = tt + 24*SECONDS_EACH_HOUR*offset_day; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) + + "-" + std::to_string(t->tm_mday); + + return str; +} + +std::string GetTableName() { + static std::string s_id(CurrentTime()); + return "tbl_" + s_id; +} + +TableSchema BuildTableSchema() { + TableSchema tb_schema; + tb_schema.table_name = TABLE_NAME; + tb_schema.dimension = TABLE_DIMENSION; + tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE; + + return tb_schema; +} + +void BuildVectors(int64_t from, int64_t to, + std::vector& vector_record_array) { + if(to <= from){ + return; } - class TimeRecorder { - public: - explicit TimeRecorder(const std::string& title) - : title_(title) { - start_ = std::chrono::system_clock::now(); + vector_record_array.clear(); + for (int64_t k = from; k < to; k++) { + RowRecord record; + record.data.resize(TABLE_DIMENSION); + for(int64_t i = 0; i < TABLE_DIMENSION; i++) { + record.data[i] = (float)(k%(i+1)); } - ~TimeRecorder() { - std::chrono::system_clock::time_point end = std::chrono::system_clock::now(); - long span = (std::chrono::duration_cast (end - start_)).count(); - std::cout << title_ << " totally cost: " << span << " ms" << std::endl; - } + vector_record_array.emplace_back(record); + } +} - private: - std::string title_; - std::chrono::system_clock::time_point start_; - }; +void Sleep(int seconds) { + std::cout << "Waiting " << seconds << " seconds ..." << std::endl; + sleep(seconds); +} - void CheckResult(const std::vector>& search_record_array, - const std::vector& topk_query_result_array) { - BLOCK_SPLITER - int64_t index = 0; - for(auto& result : topk_query_result_array) { - auto result_id = result.query_result_arrays[0].id; - auto search_id = search_record_array[index++].first; - if(result_id != search_id) { - std::cout << "The top 1 result is wrong: " << result_id - << " vs. " << search_id << std::endl; - } else { - std::cout << "Check result sucessfully" << std::endl; - } - } - BLOCK_SPLITER +class TimeRecorder { + public: + explicit TimeRecorder(const std::string& title) + : title_(title) { + start_ = std::chrono::system_clock::now(); } - void DoSearch(std::shared_ptr conn, - const std::vector>& search_record_array, - const std::string& phase_name) { - std::vector query_range_array; - Range rg; - rg.start_value = CurrentTmDate(); - rg.end_value = CurrentTmDate(1); - query_range_array.emplace_back(rg); - - std::vector record_array; - for(auto& pair : search_record_array) { - record_array.push_back(pair.second); - } - - auto start = std::chrono::high_resolution_clock::now(); - for (auto i = 0; i < 5; ++i) { - std::vector topk_query_result_array; - { - TimeRecorder rc(phase_name); - Status stat = conn->Search("zilliz_face", record_array, query_range_array, TOP_K, 10, topk_query_result_array); - std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; - } - if (i == 0) { - PrintSearchResult(search_record_array, topk_query_result_array); - } - } - auto finish = std::chrono::high_resolution_clock::now(); - std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast>(finish - start).count() << "s\n"; - - -// CheckResult(search_record_array, topk_query_result_array); + ~TimeRecorder() { + std::chrono::system_clock::time_point end = std::chrono::system_clock::now(); + long span = (std::chrono::duration_cast (end - start_)).count(); + std::cout << title_ << " totally cost: " << span << " ms" << std::endl; } + + private: + std::string title_; + std::chrono::system_clock::time_point start_; +}; + +void CheckResult(const std::vector>& search_record_array, + const std::vector& topk_query_result_array) { + BLOCK_SPLITER + int64_t index = 0; + for(auto& result : topk_query_result_array) { + auto result_id = result.query_result_arrays[0].id; + auto search_id = search_record_array[index++].first; + if(result_id != search_id) { + std::cout << "The top 1 result is wrong: " << result_id + << " vs. " << search_id << std::endl; + } else { + std::cout << "Check result sucessfully" << std::endl; + } + } + BLOCK_SPLITER +} + +void DoSearch(std::shared_ptr conn, + const std::vector>& search_record_array, + const std::string& phase_name) { + std::vector query_range_array; + Range rg; + rg.start_value = CurrentTmDate(); + rg.end_value = CurrentTmDate(1); + query_range_array.emplace_back(rg); + + std::vector record_array; + for(auto& pair : search_record_array) { + record_array.push_back(pair.second); + } + + auto start = std::chrono::high_resolution_clock::now(); + std::vector topk_query_result_array; + { + TimeRecorder rc(phase_name); + Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array); + std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; + } + auto finish = std::chrono::high_resolution_clock::now(); + std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast>(finish - start).count() << "s\n"; + + PrintSearchResult(search_record_array, topk_query_result_array); + CheckResult(search_record_array, topk_query_result_array); +} } void @@ -219,9 +214,9 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::cout << "All tables: " << std::endl; for(auto& table : tables) { int64_t row_count = 0; -// conn->DropTable(table); - stat = conn->CountTable(table, row_count); - std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl; + conn->DropTable(table); +// stat = conn->CountTable(table, row_count); +// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl; } } @@ -276,7 +271,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { if(search_record_array.size() < NQ) { search_record_array.push_back( - std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET])); + std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET])); } } } @@ -287,7 +282,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { int64_t row_count = 0; Status stat = conn->CountTable(TABLE_NAME, row_count); std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl; -// DoSearch(conn, search_record_array, "Search without index"); + DoSearch(conn, search_record_array, "Search without index"); } {//wait unit build index finish @@ -311,19 +306,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { } {//search vectors after build index finish - std::vector> search_array; - std::vector row_record_array; - row_record_array.resize(NQ); - for (int64_t i = 0; i < NQ; ++i) { - row_record_array[i].data.resize(TABLE_DIMENSION); - for (auto j = 0; j < TABLE_DIMENSION; ++j) { - row_record_array[i].data[j] = 1; - } - search_array.push_back(std::make_pair(i, row_record_array[i])); - } - - DoSearch(conn, search_array, "Search after build index finish"); - + DoSearch(conn, search_record_array, "Search after build index finish"); // std::cout << conn->DumpTaskTables() << std::endl; } @@ -360,4 +343,4 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::string status = conn->ServerStatus(); std::cout << "Server status after disconnect: " << status << std::endl; } -} \ No newline at end of file +} diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index 287fe51128..ac666c86a9 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -42,5 +42,5 @@ add_subdirectory(server) add_subdirectory(db) add_subdirectory(knowhere) add_subdirectory(metrics) -add_subdirectory(scheduler) +#add_subdirectory(scheduler) #add_subdirectory(storage) \ No newline at end of file From cdd2b08901eb01cee07584fc27d650c30ef408f5 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 2 Sep 2019 15:49:34 +0800 Subject: [PATCH 4/6] Add changelog Former-commit-id: e6c748e6e697deab5ce38e71c694b3dd8bc1d7be --- cpp/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 67a17013f4..9c5fc5b7f6 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -74,6 +74,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-442 - Merge Knowhere - MS-445 - Rename CopyCompleted to LoadCompleted - MS-451 - Update server_config.template file, set GPU compute default +- MS-455 - Distribute tasks by minimal cost in scheduler ## New Feature - MS-343 - Implement ResourceMgr From 600387fba4528ec99154241cb9fe276c8ae9391b Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 2 Sep 2019 15:53:11 +0800 Subject: [PATCH 5/6] fix bug Former-commit-id: eb4a91c3bd2ddbe22994c8c5d856cfa6dce31fee --- cpp/src/scheduler/resource/Resource.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index f984cb4b46..5395eafb42 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -204,8 +204,8 @@ class Resource : public Node, public std::enable_shared_from_this { TaskTable task_table_; - uint64_t total_cost_ = 10; - uint64_t total_task_ = 10; + uint64_t total_cost_ = 0; + uint64_t total_task_ = 0; std::map> register_table_; std::function subscriber_ = nullptr; From d1d220561b0ba5bd7f9f2868f430e683ca1adbb2 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 2 Sep 2019 17:11:26 +0800 Subject: [PATCH 6/6] MS-460 Put transport speed as weight when choosing neighbour to execute task Former-commit-id: d82b330df300d269a3a11544fc5625ea317f9118 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/SchedInst.cpp | 19 +++++++--- .../scheduler/action/PushTaskToNeighbour.cpp | 37 +++++++++++++++++-- cpp/src/server/ServerConfig.h | 2 + 4 files changed, 50 insertions(+), 9 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 9c5fc5b7f6..f69311adbb 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -75,6 +75,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-445 - Rename CopyCompleted to LoadCompleted - MS-451 - Update server_config.template file, set GPU compute default - MS-455 - Distribute tasks by minimal cost in scheduler +- MS-460 - Put transport speed as weight when choosing neighbour to execute task ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 3ee8cbfdb6..43204f0946 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -43,14 +43,21 @@ StartSchedulerService() { knowhere::FaissGpuResourceMgr::GetInstance().InitResource(); - auto default_connection = Connection("default_connection", 500.0); - auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS); +// auto default_connection = Connection("default_connection", 500.0); + auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren(); for (auto &conn : connections) { - std::string delimiter = "==="; - std::string left = conn.substr(0, conn.find(delimiter)); - std::string right = conn.substr(conn.find(delimiter) + 3, conn.length()); + auto &connect_name = conn.first; + auto &connect_conf = conn.second; + auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS); + auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS); - ResMgrInst::GetInstance()->Connect(left, right, default_connection); + std::string delimiter = "==="; + std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter)); + std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3, + connect_endpoint.length()); + + auto connection = Connection(connect_name, connect_speed); + ResMgrInst::GetInstance()->Connect(left, right, connection); } ResMgrInst::GetInstance()->Start(); diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 1939cbc127..200f6214fe 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -28,17 +28,48 @@ get_neighbours(const ResourcePtr &self) { return neighbours; } +std::vector> +get_neighbours_with_connetion(const ResourcePtr &self) { + std::vector> neighbours; + for (auto &neighbour_node : self->GetNeighbours()) { + auto node = neighbour_node.neighbour_node.lock(); + if (not node) continue; + + auto resource = std::static_pointer_cast(node); +// if (not resource->HasExecutor()) continue; + Connection conn = neighbour_node.connection; + neighbours.emplace_back(std::make_pair(resource, conn)); + } + return neighbours; +} + void Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { - auto neighbours = get_neighbours(self); + auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { + std::vector speeds; + uint64_t total_speed = 0; + for (auto &neighbour : neighbours) { + uint64_t speed = neighbour.second.speed(); + speeds.emplace_back(speed); + total_speed += speed; + } + std::random_device rd; std::mt19937 mt(rd()); - std::uniform_int_distribution dist(0, neighbours.size() - 1); + std::uniform_int_distribution dist(0, total_speed); + uint64_t index = 0; + int64_t rd_speed = dist(mt); + for (uint64_t i = 0; i < speeds.size(); ++i) { + rd_speed -= speeds[i]; + if (rd_speed <= 0) { + neighbours[i].first->task_table().Put(task); + return; + } + } - neighbours[dist(mt)]->task_table().Put(task); } else { //TODO: process } diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index f8f6deea98..4b4a80e8da 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -57,6 +57,8 @@ static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id"; static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader"; static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor"; static const char* CONFIG_RESOURCE_CONNECTIONS = "connections"; +static const char* CONFIG_SPEED_CONNECTIONS = "speed"; +static const char* CONFIG_ENDPOINT_CONNECTIONS = "connections"; class ServerConfig {