Support build index with multiple gpu

This commit is contained in:
fishpenguin 2019-11-18 11:38:48 +08:00
parent 49ec7e1a7e
commit 513ad3b842
15 changed files with 233 additions and 117 deletions

View File

@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#12 - Pure CPU version for Milvus
- \#77 - Support table partition
- \#226 - Experimental shards middleware for Milvus
- \#346 - Support build index with multiple gpu
## Improvement
- \#275 - Rename C++ SDK IndexType

View File

@ -40,4 +40,5 @@ engine_config:
resource_config:
search_resources: # define the device used for search computation
- cpu
index_build_device: cpu # CPU used for building index
index_build_device: # CPU used for building index
- cpu

View File

@ -42,4 +42,5 @@ resource_config:
search_resources: # define the devices used for search computation, must be in format: cpu or gpux
- cpu
- gpu0
index_build_device: gpu0 # CPU / GPU used for building index, must be in format: cpu or gpux
index_build_device: # CPU / GPU used for building index, must be in format: cpu or gpux
- gpu0

View File

@ -570,12 +570,19 @@ ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
Status
ExecutionEngineImpl::Init() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetResourceConfigIndexBuildDevice(gpu_num_);
std::vector<int64_t> gpu_ids;
Status s = config.GetResourceConfigIndexBuildDevice(gpu_ids);
if (!s.ok()) {
return s;
}
for (auto id : gpu_ids) {
if (gpu_num_ == id) {
return Status::OK();
}
}
return Status::OK();
std::string msg = "Invalid gpu_num";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
} // namespace engine

View File

@ -104,7 +104,7 @@ JobMgr::build_task(const JobPtr& job) {
void
JobMgr::calculate_path(const TaskPtr& task) {
if (task->type_ != TaskType::SearchTask) {
if (task->type_ != TaskType::SearchTask && task->type_ != TaskType::BuildIndexTask) {
return;
}

View File

@ -54,8 +54,8 @@ load_simple_config() {
// get resources
auto gpu_ids = get_gpu_pool();
int32_t build_gpu_id;
config.GetResourceConfigIndexBuildDevice(build_gpu_id);
std::vector<int64_t> build_gpu_ids;
config.GetResourceConfigIndexBuildDevice(build_gpu_ids);
// create and connect
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
@ -65,19 +65,30 @@ load_simple_config() {
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
auto pcie = Connection("pcie", 12000);
bool find_build_gpu_id = false;
for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
if (build_gpu_id == gpu_id) {
find_build_gpu_id = true;
std::vector<int64_t> not_find_build_ids;
for (auto& build_id : build_gpu_ids) {
bool find_gpu_id = false;
for (auto& gpu_id : gpu_ids) {
if (gpu_id == build_id) {
find_gpu_id = true;
break;
}
}
if (not find_gpu_id) {
not_find_build_ids.emplace_back(build_id);
}
}
if (not find_build_gpu_id) {
for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
}
for (auto& not_find_id : not_find_build_ids) {
ResMgrInst::GetInstance()->Add(
ResourceFactory::Create(std::to_string(build_gpu_id), "GPU", build_gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(build_gpu_id), pcie);
ResourceFactory::Create(std::to_string(not_find_id), "GPU", not_find_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(not_find_id), pcie);
}
}

View File

@ -21,6 +21,7 @@
#include "JobMgr.h"
#include "ResourceMgr.h"
#include "Scheduler.h"
#include "optimizer/BuildIndexPass.h"
#include "optimizer/HybridPass.h"
#include "optimizer/LargeSQ8HPass.h"
#include "optimizer/OnlyCPUPass.h"
@ -107,11 +108,15 @@ class OptimizerInst {
}
}
std::vector<int64_t> build_resources;
config.GetResourceConfigIndexBuildDevice(build_resources);
std::vector<PassPtr> pass_list;
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
pass_list.push_back(std::make_shared<HybridPass>());
pass_list.push_back(std::make_shared<OnlyCPUPass>());
pass_list.push_back(std::make_shared<OnlyGPUPass>(has_cpu));
pass_list.push_back(std::make_shared<BuildIndexPass>(build_resources));
instance = std::make_shared<Optimizer>(pass_list);
}
}

View File

@ -138,73 +138,41 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
std::shared_ptr<LoadCompletedEvent> event) {
auto task_item = event->task_table_item_;
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto& res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// if (task->job_.lock()->type() == JobType::SEARCH) {
// auto label = task->label();
// auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
// if (spec_label->resource().lock()->type() == ResourceType::CPU) {
// std::vector<std::string> spec_path;
// spec_path.push_back(spec_label->resource().lock()->name());
// spec_path.push_back(resource->name());
// task->path() = Path(spec_path, spec_path.size() - 1);
// } else {
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
// uint64_t min_cost_idx = 0;
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->TotalTasks() == 0) {
// min_cost_idx = i;
// break;
// }
// uint64_t cost = compute_resources[i]->TaskAvgCost() *
// compute_resources[i]->NumOfTaskToExec() +
// transport_costs[i];
// if (min_cost > cost) {
// min_cost = cost;
// min_cost_idx = i;
// }
// }
//
// // step 3: set path in task
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
// task->path() = task_path;
// }
//
// } else
if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config
// get build index gpu resource
server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu;
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
bool find_gpu_res = false;
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->name() ==
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
}
}
}
if (not find_gpu_res) {
task->path() = Path(paths[0], paths[0].size() - 1);
}
}
}
// if (resource->type() == ResourceType::DISK) {
// // step 1: calculate shortest path per resource, from disk to compute resource
// auto compute_resources = res_mgr->GetComputeResources();
// std::vector<std::vector<std::string>> paths;
// std::vector<uint64_t> transport_costs;
// for (auto& res : compute_resources) {
// std::vector<std::string> path;
// uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
// transport_costs.push_back(transport_cost);
// paths.emplace_back(path);
// }
// if (task->job_.lock()->type() == JobType::BUILD) {
// // step2: Read device id in config
// // get build index gpu resource
// server::Config& config = server::Config::GetInstance();
// int32_t build_index_gpu;
// Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
//
// bool find_gpu_res = false;
// if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->name() ==
// res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
// find_gpu_res = true;
// Path task_path(paths[i], paths[i].size() - 1);
// task->path() = task_path;
// break;
// }
// }
// }
// if (not find_gpu_res) {
// task->path() = Path(paths[0], paths[0].size() - 1);
// }
// }
// }
if (resource->name() == task->path().Last()) {
resource->WakeupExecutor();

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/BuildIndexPass.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
BuildIndexPass::BuildIndexPass(std::vector<int64_t>& build_gpu_ids) : build_gpu_ids_(build_gpu_ids) {
}
bool
BuildIndexPass::Run(const TaskPtr& task) {
if (task->Type() != TaskType::BuildIndexTask)
return false;
if (build_gpu_ids_.empty())
return false;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, build_gpu_ids_[specified_gpu_id_]);
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
specified_gpu_id_ = (specified_gpu_id_ + 1) % build_gpu_ids_.size();
return true;
}
} // namespace scheduler
} // namespace milvus

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Pass.h"
namespace milvus {
namespace scheduler {
class BuildIndexPass : public Pass {
public:
explicit BuildIndexPass(std::vector<int64_t>& build_gpu_id);
public:
bool
Run(const TaskPtr& task) override;
private:
uint64_t specified_gpu_id_ = 0;
std::vector<int64_t> build_gpu_ids_;
};
using BuildIndexPassPtr = std::shared_ptr<BuildIndexPass>;
} // namespace scheduler
} // namespace milvus

View File

@ -41,12 +41,11 @@ OnlyGPUPass::Run(const TaskPtr& task) {
auto gpu_id = get_gpu_pool();
if (gpu_id.empty())
return false;
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, gpu_id[specified_gpu_id_]);
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
specified_gpu_id_ = specified_gpu_id_++ % gpu_id.size();
specified_gpu_id_ = (specified_gpu_id_ + 1) % gpu_id.size();
return true;
}

View File

@ -215,8 +215,8 @@ Config::ValidateConfig() {
return s;
}
int32_t resource_index_build_device;
s = GetResourceConfigIndexBuildDevice(resource_index_build_device);
std::vector<int64_t> index_build_devices;
s = GetResourceConfigIndexBuildDevice(index_build_devices);
if (!s.ok()) {
return s;
}
@ -599,22 +599,24 @@ Config::CheckCacheConfigGpuCacheCapacity(const std::string& value) {
return Status(SERVER_INVALID_ARGUMENT, msg);
} else {
uint64_t gpu_cache_capacity = std::stoi(value) * GB;
int device_id;
Status s = GetResourceConfigIndexBuildDevice(device_id);
std::vector<int64_t> device_ids;
Status s = GetResourceConfigIndexBuildDevice(device_ids);
if (!s.ok()) {
return s;
}
size_t gpu_memory;
if (!ValidationUtil::GetGpuMemory(device_id, gpu_memory).ok()) {
std::string msg = "Fail to get GPU memory for GPU device: " + std::to_string(device_id);
return Status(SERVER_UNEXPECTED_ERROR, msg);
} else if (gpu_cache_capacity >= gpu_memory) {
std::string msg = "Invalid gpu cache capacity: " + value +
". Possible reason: cache_config.gpu_cache_capacity exceeds GPU memory.";
return Status(SERVER_INVALID_ARGUMENT, msg);
} else if (gpu_cache_capacity > (double)gpu_memory * 0.9) {
std::cerr << "Warning: gpu cache capacity value is too big" << std::endl;
for (auto& device_id : device_ids) {
if (!ValidationUtil::GetGpuMemory(device_id, gpu_memory).ok()) {
std::string msg = "Fail to get GPU memory for GPU device: " + std::to_string(device_id);
return Status(SERVER_UNEXPECTED_ERROR, msg);
} else if (gpu_cache_capacity >= gpu_memory) {
std::string msg = "Invalid gpu cache capacity: " + value +
". Possible reason: cache_config.gpu_cache_capacity exceeds GPU memory.";
return Status(SERVER_INVALID_ARGUMENT, msg);
} else if (gpu_cache_capacity > (double)gpu_memory * 0.9) {
std::cerr << "Warning: gpu cache capacity value is too big" << std::endl;
}
}
}
return Status::OK();
@ -745,11 +747,21 @@ Config::CheckResourceConfigSearchResources(const std::vector<std::string>& value
}
Status
Config::CheckResourceConfigIndexBuildDevice(const std::string& value) {
auto status = CheckResource(value);
if (!status.ok()) {
return Status(SERVER_INVALID_ARGUMENT, status.message());
Config::CheckResourceConfigIndexBuildDevice(const std::vector<std::string>& value) {
if (value.empty()) {
std::string msg =
"Invalid index build resource. "
"Possible reason: resource_config.index_build_device is empty.";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
for (auto& resource : value) {
auto status = CheckResource(resource);
if (!status.ok()) {
return Status(SERVER_INVALID_ARGUMENT, status.message());
}
}
return Status::OK();
}
@ -1036,18 +1048,25 @@ Config::GetResourceConfigSearchResources(std::vector<std::string>& value) {
}
Status
Config::GetResourceConfigIndexBuildDevice(int32_t& value) {
Config::GetResourceConfigIndexBuildDevice(std::vector<int64_t>& value) {
std::string str =
GetConfigStr(CONFIG_RESOURCE, CONFIG_RESOURCE_INDEX_BUILD_DEVICE, CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT);
Status s = CheckResourceConfigIndexBuildDevice(str);
GetConfigSequenceStr(CONFIG_RESOURCE, CONFIG_RESOURCE_INDEX_BUILD_DEVICE, CONFIG_RESOURCE_INDEX_BUILD_DELIMITER,
CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT);
std::vector<std::string> resources;
server::StringHelpFunctions::SplitStringByDelimeter(str, CONFIG_RESOURCE_INDEX_BUILD_DELIMITER, resources);
Status s = CheckResourceConfigIndexBuildDevice(resources);
if (!s.ok()) {
return s;
}
if (str == "cpu") {
value = CPU_DEVICE_ID;
} else {
value = std::stoi(str.substr(3));
for (auto res : resources) {
if (res == "cpu") {
value.emplace_back(CPU_DEVICE_ID);
break;
}
int64_t device_id = std::stoi(str.substr(3));
value.emplace_back(device_id);
}
return Status::OK();
@ -1318,7 +1337,10 @@ Config::SetResourceConfigSearchResources(const std::string& value) {
Status
Config::SetResourceConfigIndexBuildDevice(const std::string& value) {
Status s = CheckResourceConfigIndexBuildDevice(value);
std::vector<std::string> res_vec;
server::StringHelpFunctions::SplitStringByDelimeter(value, CONFIG_RESOURCE_INDEX_BUILD_DELIMITER, res_vec);
Status s = CheckResourceConfigIndexBuildDevice(res_vec);
if (!s.ok()) {
return s;
}

View File

@ -101,10 +101,11 @@ static const char* CONFIG_RESOURCE_SEARCH_RESOURCES_DEFAULT = "cpu,gpu0";
#endif
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE = "index_build_device";
static const char* CONFIG_RESOURCE_INDEX_BUILD_DELIMITER = ",";
#ifdef MILVUS_CPU_VERSION
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "cpu";
#else
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "gpu0";
static const char* CONFIG_RESOURCE_INDEX_BUILD_DEVICE_DEFAULT = "cpu,gpu0";
#endif
const int32_t CPU_DEVICE_ID = -1;
@ -190,7 +191,7 @@ class Config {
Status
CheckResourceConfigSearchResources(const std::vector<std::string>& value);
Status
CheckResourceConfigIndexBuildDevice(const std::string& value);
CheckResourceConfigIndexBuildDevice(const std::vector<std::string>& value);
std::string
GetConfigStr(const std::string& parent_key, const std::string& child_key, const std::string& default_value = "");
@ -259,7 +260,7 @@ class Config {
Status
GetResourceConfigSearchResources(std::vector<std::string>& value);
Status
GetResourceConfigIndexBuildDevice(int32_t& value);
GetResourceConfigIndexBuildDevice(std::vector<int64_t>& value);
public:
/* server config */

View File

@ -48,12 +48,14 @@ KnowhereResource::Initialize() {
// get build index gpu resource
server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu;
s = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
std::vector<int64_t> build_index_gpus;
s = config.GetResourceConfigIndexBuildDevice(build_index_gpus);
if (!s.ok())
return s;
gpu_resources.insert(std::make_pair(build_index_gpu, GpuResourceSetting()));
for (auto gpu_id : build_index_gpus) {
gpu_resources.insert(std::make_pair(gpu_id, GpuResourceSetting()));
}
// get search gpu resource
std::vector<std::string> pool;

View File

@ -292,7 +292,8 @@ TEST_F(ConfigTest, SERVER_CONFIG_VALID_TEST) {
s = config.SetResourceConfigIndexBuildDevice("gpu" + std::to_string(resource_index_build_device));
#endif
ASSERT_TRUE(s.ok());
s = config.GetResourceConfigIndexBuildDevice(int32_val);
std::vector<int64_t> device_ids;
s = config.GetResourceConfigIndexBuildDevice(device_ids);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(int32_val == resource_index_build_device);
}