mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-29 23:15:28 +08:00
Merge branch 'branch-0.5.0' into 'branch-0.5.0'
SQ8H in GPU See merge request megasearch/milvus!701 Former-commit-id: 8ee14aab943cbe9a979787a7169fe6c2f02f0382
This commit is contained in:
commit
0cd2e2a0c9
@ -115,6 +115,19 @@ IVF::Search(const DatasetPtr& dataset, const Config& config) {
|
||||
|
||||
search_impl(rows, (float*)p_data, search_cfg->k, res_dis, res_ids, config);
|
||||
|
||||
// std::stringstream ss_res_id, ss_res_dist;
|
||||
// for (int i = 0; i < 10; ++i) {
|
||||
// printf("%llu", res_ids[i]);
|
||||
// printf("\n");
|
||||
// printf("%.6f", res_dis[i]);
|
||||
// printf("\n");
|
||||
// ss_res_id << res_ids[i] << " ";
|
||||
// ss_res_dist << res_dis[i] << " ";
|
||||
// }
|
||||
// std::cout << std::endl << "after search: " << std::endl;
|
||||
// std::cout << ss_res_id.str() << std::endl;
|
||||
// std::cout << ss_res_dist.str() << std::endl << std::endl;
|
||||
|
||||
auto id_buf = MakeMutableBufferSmart((uint8_t*)res_ids, sizeof(int64_t) * elems);
|
||||
auto dist_buf = MakeMutableBufferSmart((uint8_t*)res_dis, sizeof(float) * elems);
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
// under the License.
|
||||
|
||||
#include "knowhere/index/vector_index/IndexIVFSQHybrid.h"
|
||||
#include <utility>
|
||||
#include "faiss/AutoTune.h"
|
||||
#include "faiss/gpu/GpuAutoTune.h"
|
||||
#include "faiss/gpu/GpuIndexIVF.h"
|
||||
@ -79,20 +80,8 @@ IVFSQHybrid::CopyGpuToCpu(const Config& config) {
|
||||
VectorIndexPtr
|
||||
IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) {
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) {
|
||||
ResScope rs(res, device_id, false);
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
option.allInGpu = true;
|
||||
|
||||
faiss::IndexComposition index_composition;
|
||||
index_composition.index = index_.get();
|
||||
index_composition.quantizer = nullptr;
|
||||
index_composition.mode = 0; // copy all
|
||||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option);
|
||||
|
||||
std::shared_ptr<faiss::Index> device_index;
|
||||
device_index.reset(gpu_index);
|
||||
return std::make_shared<IVFSQHybrid>(device_index, device_id, res);
|
||||
auto p = CopyCpuToGpuWithQuantizer(device_id, config);
|
||||
return p.first;
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
|
||||
}
|
||||
@ -180,7 +169,7 @@ IVFSQHybrid::UnsetQuantizer() {
|
||||
ivf_index->quantizer = nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
VectorIndexPtr
|
||||
IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
auto quantizer_conf = std::dynamic_pointer_cast<QuantizerCfg>(conf);
|
||||
if (quantizer_conf != nullptr) {
|
||||
@ -188,9 +177,10 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
KNOWHERE_THROW_MSG("mode only support 2 in this func");
|
||||
}
|
||||
}
|
||||
if (quantizer_conf->gpu_id != gpu_id_) {
|
||||
KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card");
|
||||
}
|
||||
// if (quantizer_conf->gpu_id != gpu_id_) {
|
||||
// KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card");
|
||||
// }
|
||||
gpu_id_ = quantizer_conf->gpu_id;
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(res, gpu_id_, false);
|
||||
@ -207,8 +197,37 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
index_composition->mode = quantizer_conf->mode; // only 2
|
||||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index_composition, &option);
|
||||
index_.reset(gpu_index);
|
||||
gpu_mode = 2; // all in gpu
|
||||
std::shared_ptr<faiss::Index> new_idx;
|
||||
new_idx.reset(gpu_index);
|
||||
auto sq_idx = std::make_shared<IVFSQHybrid>(new_idx, gpu_id_, res);
|
||||
return sq_idx;
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<VectorIndexPtr, QuantizerPtr>
|
||||
IVFSQHybrid::CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& config) {
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) {
|
||||
ResScope rs(res, device_id, false);
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
option.allInGpu = true;
|
||||
|
||||
faiss::IndexComposition index_composition;
|
||||
index_composition.index = index_.get();
|
||||
index_composition.quantizer = nullptr;
|
||||
index_composition.mode = 0; // copy all
|
||||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option);
|
||||
|
||||
std::shared_ptr<faiss::Index> device_index;
|
||||
device_index.reset(gpu_index);
|
||||
auto new_idx = std::make_shared<IVFSQHybrid>(device_index, device_id, res);
|
||||
|
||||
auto q = std::make_shared<FaissIVFQuantizer>();
|
||||
q->quantizer = index_composition.quantizer;
|
||||
q->size = index_composition.quantizer->d * index_composition.quantizer->getNumVecs() * sizeof(float);
|
||||
return std::make_pair(new_idx, q);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
|
||||
#include <faiss/index_io.h>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include "IndexGPUIVFSQ.h"
|
||||
#include "Quantizer.h"
|
||||
@ -60,9 +61,12 @@ class IVFSQHybrid : public GPUIVFSQ {
|
||||
void
|
||||
UnsetQuantizer();
|
||||
|
||||
void
|
||||
VectorIndexPtr
|
||||
LoadData(const knowhere::QuantizerPtr& q, const Config& conf);
|
||||
|
||||
std::pair<VectorIndexPtr, QuantizerPtr>
|
||||
CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& config);
|
||||
|
||||
IndexModelPtr
|
||||
Train(const DatasetPtr& dataset, const Config& config) override;
|
||||
|
||||
|
||||
@ -243,23 +243,23 @@ TEST_P(IVFTest, hybrid) {
|
||||
hybrid_1_idx->UnsetQuantizer();
|
||||
}
|
||||
|
||||
// {
|
||||
// auto hybrid_2_idx = std::make_shared<knowhere::IVFSQHybrid>(device_id);
|
||||
//
|
||||
// auto binaryset = index_->Serialize();
|
||||
// hybrid_2_idx->Load(binaryset);
|
||||
//
|
||||
// auto quantizer_conf = std::make_shared<knowhere::QuantizerCfg>();
|
||||
// quantizer_conf->mode = 1;
|
||||
// quantizer_conf->gpu_id = device_id;
|
||||
// auto q = hybrid_2_idx->LoadQuantizer(quantizer_conf);
|
||||
// quantizer_conf->mode = 2;
|
||||
// hybrid_2_idx->LoadData(q, quantizer_conf);
|
||||
//
|
||||
// auto result = hybrid_2_idx->Search(query_dataset, conf);
|
||||
// AssertAnns(result, nq, conf->k);
|
||||
// PrintResult(result, nq, k);
|
||||
// }
|
||||
{
|
||||
auto hybrid_2_idx = std::make_shared<knowhere::IVFSQHybrid>(device_id);
|
||||
|
||||
auto binaryset = index_->Serialize();
|
||||
hybrid_2_idx->Load(binaryset);
|
||||
|
||||
auto quantizer_conf = std::make_shared<knowhere::QuantizerCfg>();
|
||||
quantizer_conf->mode = 1;
|
||||
quantizer_conf->gpu_id = device_id;
|
||||
auto q = hybrid_2_idx->LoadQuantizer(quantizer_conf);
|
||||
quantizer_conf->mode = 2;
|
||||
auto gpu_idx = hybrid_2_idx->LoadData(q, quantizer_conf);
|
||||
|
||||
auto result = gpu_idx->Search(query_dataset, conf);
|
||||
AssertAnns(result, nq, conf->k);
|
||||
PrintResult(result, nq, k);
|
||||
}
|
||||
}
|
||||
|
||||
// TEST_P(IVFTest, gpu_to_cpu) {
|
||||
|
||||
@ -65,7 +65,7 @@ class ExecutionEngine {
|
||||
Load(bool to_cache = true) = 0;
|
||||
|
||||
virtual Status
|
||||
CopyToGpu(uint64_t device_id) = 0;
|
||||
CopyToGpu(uint64_t device_id, bool hybrid) = 0;
|
||||
|
||||
virtual Status
|
||||
CopyToIndexFileToGpu(uint64_t device_id) = 0;
|
||||
@ -80,7 +80,8 @@ class ExecutionEngine {
|
||||
Merge(const std::string& location) = 0;
|
||||
|
||||
virtual Status
|
||||
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const = 0;
|
||||
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
|
||||
bool hybrid) const = 0;
|
||||
|
||||
virtual std::shared_ptr<ExecutionEngine>
|
||||
BuildIndex(const std::string& location, EngineType engine_type) = 0;
|
||||
|
||||
@ -31,6 +31,7 @@
|
||||
#include "wrapper/ConfAdapter.h"
|
||||
#include "wrapper/ConfAdapterMgr.h"
|
||||
|
||||
#include <src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h>
|
||||
#include <src/scheduler/Utils.h>
|
||||
#include <stdexcept>
|
||||
#include <utility>
|
||||
@ -245,7 +246,31 @@ ExecutionEngineImpl::Load(bool to_cache) {
|
||||
}
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
|
||||
ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
|
||||
if (hybrid) {
|
||||
auto key = location_ + ".quantizer";
|
||||
auto quantizer =
|
||||
std::static_pointer_cast<CachedQuantizer>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(key));
|
||||
|
||||
auto conf = std::make_shared<knowhere::QuantizerCfg>();
|
||||
conf->gpu_id = device_id;
|
||||
|
||||
if (quantizer) {
|
||||
// cache hit
|
||||
conf->mode = 2;
|
||||
auto new_index = index_->LoadData(quantizer->Data(), conf);
|
||||
index_ = new_index;
|
||||
} else {
|
||||
auto pair = index_->CopyToGpuWithQuantizer(device_id);
|
||||
index_ = pair.first;
|
||||
|
||||
// cache
|
||||
auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
|
||||
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
|
||||
bool already_in_cache = (index != nullptr);
|
||||
if (already_in_cache) {
|
||||
@ -389,8 +414,8 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
|
||||
}
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances,
|
||||
int64_t* labels) const {
|
||||
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
|
||||
bool hybrid) const {
|
||||
if (index_ == nullptr) {
|
||||
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
|
||||
return Status(DB_ERROR, "index is null");
|
||||
@ -406,11 +431,15 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr
|
||||
auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
|
||||
auto conf = adapter->MatchSearch(temp_conf, index_->GetType());
|
||||
|
||||
HybridLoad();
|
||||
if (hybrid) {
|
||||
HybridLoad();
|
||||
}
|
||||
|
||||
auto status = index_->Search(n, data, distances, labels, conf);
|
||||
|
||||
HybridUnset();
|
||||
if (hybrid) {
|
||||
HybridUnset();
|
||||
}
|
||||
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Search error";
|
||||
|
||||
@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
|
||||
Load(bool to_cache) override;
|
||||
|
||||
Status
|
||||
CopyToGpu(uint64_t device_id) override;
|
||||
CopyToGpu(uint64_t device_id, bool hybrid = false) override;
|
||||
|
||||
Status
|
||||
CopyToIndexFileToGpu(uint64_t device_id) override;
|
||||
@ -71,7 +71,8 @@ class ExecutionEngineImpl : public ExecutionEngine {
|
||||
Merge(const std::string& location) override;
|
||||
|
||||
Status
|
||||
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const override;
|
||||
Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
|
||||
bool hybrid = false) const override;
|
||||
|
||||
ExecutionEnginePtr
|
||||
BuildIndex(const std::string& location, EngineType engine_type) override;
|
||||
|
||||
@ -19,9 +19,11 @@
|
||||
#include "SchedInst.h"
|
||||
#include "TaskCreator.h"
|
||||
#include "optimizer/Optimizer.h"
|
||||
#include "scheduler/Algorithm.h"
|
||||
#include "scheduler/optimizer/Optimizer.h"
|
||||
#include "scheduler/tasklabel/SpecResLabel.h"
|
||||
#include "task/Task.h"
|
||||
|
||||
#include <src/scheduler/optimizer/Optimizer.h>
|
||||
#include <utility>
|
||||
|
||||
namespace milvus {
|
||||
@ -73,6 +75,10 @@ JobMgr::worker_function() {
|
||||
OptimizerInst::GetInstance()->Run(task);
|
||||
}
|
||||
|
||||
for (auto& task : tasks) {
|
||||
calculate_path(task);
|
||||
}
|
||||
|
||||
// disk resources NEVER be empty.
|
||||
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
|
||||
for (auto& task : tasks) {
|
||||
@ -87,5 +93,23 @@ JobMgr::build_task(const JobPtr& job) {
|
||||
return TaskCreator::Create(job);
|
||||
}
|
||||
|
||||
void
|
||||
JobMgr::calculate_path(const TaskPtr& task) {
|
||||
if (task->type_ != TaskType::SearchTask) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<std::string> path;
|
||||
auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label());
|
||||
auto src = res_mgr_->GetDiskResources()[0];
|
||||
auto dest = spec_label->resource();
|
||||
ShortestPath(src.lock(), dest.lock(), res_mgr_, path);
|
||||
task->path() = Path(path, path.size() - 1);
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
||||
@ -52,9 +52,12 @@ class JobMgr {
|
||||
void
|
||||
worker_function();
|
||||
|
||||
std::vector<TaskPtr>
|
||||
static std::vector<TaskPtr>
|
||||
build_task(const JobPtr& job);
|
||||
|
||||
void
|
||||
calculate_path(const TaskPtr& task);
|
||||
|
||||
private:
|
||||
bool running_ = false;
|
||||
std::queue<JobPtr> queue_;
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "ResourceMgr.h"
|
||||
#include "Scheduler.h"
|
||||
#include "optimizer/HybridPass.h"
|
||||
#include "optimizer/LargeSQ8HPass.h"
|
||||
#include "optimizer/Optimizer.h"
|
||||
|
||||
#include <memory>
|
||||
@ -91,9 +92,9 @@ class OptimizerInst {
|
||||
if (instance == nullptr) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (instance == nullptr) {
|
||||
HybridPassPtr pass_ptr = std::make_shared<HybridPass>();
|
||||
std::vector<PassPtr> pass_list;
|
||||
pass_list.push_back(pass_ptr);
|
||||
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
|
||||
pass_list.push_back(std::make_shared<HybridPass>());
|
||||
instance = std::make_shared<Optimizer>(pass_list);
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,37 +145,39 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
if (task->job_.lock()->type() == JobType::SEARCH) {
|
||||
auto label = task->label();
|
||||
auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
|
||||
if (spec_label->resource().lock()->type() == ResourceType::CPU) {
|
||||
std::vector<std::string> spec_path;
|
||||
spec_path.push_back(spec_label->resource().lock()->name());
|
||||
spec_path.push_back(resource->name());
|
||||
task->path() = Path(spec_path, spec_path.size() - 1);
|
||||
} else {
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
}
|
||||
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() +
|
||||
transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
}
|
||||
|
||||
} else if (task->job_.lock()->type() == JobType::BUILD) {
|
||||
// if (task->job_.lock()->type() == JobType::SEARCH) {
|
||||
// auto label = task->label();
|
||||
// auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
|
||||
// if (spec_label->resource().lock()->type() == ResourceType::CPU) {
|
||||
// std::vector<std::string> spec_path;
|
||||
// spec_path.push_back(spec_label->resource().lock()->name());
|
||||
// spec_path.push_back(resource->name());
|
||||
// task->path() = Path(spec_path, spec_path.size() - 1);
|
||||
// } else {
|
||||
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
// uint64_t min_cost_idx = 0;
|
||||
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
// if (compute_resources[i]->TotalTasks() == 0) {
|
||||
// min_cost_idx = i;
|
||||
// break;
|
||||
// }
|
||||
// uint64_t cost = compute_resources[i]->TaskAvgCost() *
|
||||
// compute_resources[i]->NumOfTaskToExec() +
|
||||
// transport_costs[i];
|
||||
// if (min_cost > cost) {
|
||||
// min_cost = cost;
|
||||
// min_cost_idx = i;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // step 3: set path in task
|
||||
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
// task->path() = task_path;
|
||||
// }
|
||||
//
|
||||
// } else
|
||||
if (task->job_.lock()->type() == JobType::BUILD) {
|
||||
// step2: Read device id in config
|
||||
// get build index gpu resource
|
||||
server::Config& config = server::Config::GetInstance();
|
||||
@ -201,12 +203,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
||||
}
|
||||
|
||||
if (resource->name() == task->path().Last()) {
|
||||
resource->WakeupLoader();
|
||||
resource->WakeupExecutor();
|
||||
} else {
|
||||
auto next_res_name = task->path().Next();
|
||||
auto next_res = res_mgr.lock()->GetResource(next_res_name);
|
||||
event->task_table_item_->Move();
|
||||
next_res->task_table().Put(task);
|
||||
if (event->task_table_item_->Move()) {
|
||||
next_res->task_table().Put(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
73
cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp
Normal file
73
cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp
Normal file
@ -0,0 +1,73 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "scheduler/optimizer/LargeSQ8HPass.h"
|
||||
#include "cache/GpuCacheMgr.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "scheduler/Utils.h"
|
||||
#include "scheduler/task/SearchTask.h"
|
||||
#include "scheduler/tasklabel/SpecResLabel.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
bool
|
||||
LargeSQ8HPass::Run(const TaskPtr& task) {
|
||||
if (task->Type() != TaskType::SearchTask) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
|
||||
|
||||
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
|
||||
if (search_job->nq() < 100) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
|
||||
std::vector<int64_t> all_free_mem;
|
||||
for (auto& gpu : gpus) {
|
||||
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
|
||||
all_free_mem.push_back(free_mem);
|
||||
}
|
||||
|
||||
auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
|
||||
auto best_index = std::distance(all_free_mem.begin(), max_e);
|
||||
auto best_device_id = gpus[best_index];
|
||||
|
||||
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
|
||||
if (not res_ptr) {
|
||||
SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
|
||||
// TODO: throw critical error and exit
|
||||
return false;
|
||||
}
|
||||
|
||||
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
task->label() = label;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
47
cpp/src/scheduler/optimizer/LargeSQ8HPass.h
Normal file
47
cpp/src/scheduler/optimizer/LargeSQ8HPass.h
Normal file
@ -0,0 +1,47 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "Pass.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
class LargeSQ8HPass : public Pass {
|
||||
public:
|
||||
LargeSQ8HPass() = default;
|
||||
|
||||
public:
|
||||
bool
|
||||
Run(const TaskPtr& task) override;
|
||||
};
|
||||
|
||||
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
@ -19,6 +19,7 @@
|
||||
#include "scheduler/Utils.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <utility>
|
||||
|
||||
namespace milvus {
|
||||
@ -111,11 +112,18 @@ Resource::pick_task_load() {
|
||||
|
||||
TaskTableItemPtr
|
||||
Resource::pick_task_execute() {
|
||||
auto indexes = task_table_.PickToExecute(3);
|
||||
auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max());
|
||||
for (auto index : indexes) {
|
||||
// try to set one task executing, then return
|
||||
if (task_table_.Execute(index))
|
||||
if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
if (task_table_[index]->task->path().Last() != name()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (task_table_.Execute(index)) {
|
||||
return task_table_.Get(index);
|
||||
}
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
#include <src/scheduler/SchedInst.h>
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
@ -121,7 +122,11 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
|
||||
stat = index_engine_->Load();
|
||||
type_str = "DISK2CPU";
|
||||
} else if (type == LoadType::CPU2GPU) {
|
||||
stat = index_engine_->CopyToGpu(device_id);
|
||||
bool hybrid = false;
|
||||
if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) {
|
||||
hybrid = true;
|
||||
}
|
||||
stat = index_engine_->CopyToGpu(device_id, hybrid);
|
||||
type_str = "CPU2GPU";
|
||||
} else if (type == LoadType::GPU2CPU) {
|
||||
stat = index_engine_->CopyToCpu();
|
||||
@ -204,7 +209,12 @@ XSearchTask::Execute() {
|
||||
|
||||
try {
|
||||
// step 2: search
|
||||
index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data());
|
||||
bool hybrid = false;
|
||||
if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H &&
|
||||
ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) {
|
||||
hybrid = true;
|
||||
}
|
||||
index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data(), hybrid);
|
||||
|
||||
double span = rc.RecordSection(hdr + ", do search");
|
||||
// search_job->AccumSearchCost(span);
|
||||
|
||||
@ -315,24 +315,40 @@ IVFHybridIndex::UnsetQuantizer() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
VecIndexPtr
|
||||
IVFHybridIndex::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
try {
|
||||
// TODO(linxj): Hardcode here
|
||||
if (auto new_idx = std::dynamic_pointer_cast<knowhere::IVFSQHybrid>(index_)) {
|
||||
new_idx->LoadData(q, conf);
|
||||
return std::make_shared<IVFHybridIndex>(new_idx->LoadData(q, conf), type);
|
||||
} else {
|
||||
WRAPPER_LOG_ERROR << "Hybrid mode not support for index type: " << int(type);
|
||||
return Status(KNOWHERE_ERROR, "not support");
|
||||
}
|
||||
} catch (knowhere::KnowhereException& e) {
|
||||
WRAPPER_LOG_ERROR << e.what();
|
||||
return Status(KNOWHERE_UNEXPECTED_ERROR, e.what());
|
||||
} catch (std::exception& e) {
|
||||
WRAPPER_LOG_ERROR << e.what();
|
||||
return Status(KNOWHERE_ERROR, e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::pair<VecIndexPtr, knowhere::QuantizerPtr>
|
||||
IVFHybridIndex::CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg) {
|
||||
try {
|
||||
// TODO(linxj): Hardcode here
|
||||
if (auto hybrid_idx = std::dynamic_pointer_cast<knowhere::IVFSQHybrid>(index_)) {
|
||||
auto pair = hybrid_idx->CopyCpuToGpuWithQuantizer(device_id, cfg);
|
||||
auto new_idx = std::make_shared<IVFHybridIndex>(pair.first, type);
|
||||
return std::make_pair(new_idx, pair.second);
|
||||
} else {
|
||||
WRAPPER_LOG_ERROR << "Hybrid mode not support for index type: " << int(type);
|
||||
}
|
||||
} catch (knowhere::KnowhereException& e) {
|
||||
WRAPPER_LOG_ERROR << e.what();
|
||||
} catch (std::exception& e) {
|
||||
WRAPPER_LOG_ERROR << e.what();
|
||||
}
|
||||
return std::make_pair(nullptr, nullptr);
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
|
||||
@ -105,8 +105,10 @@ class IVFHybridIndex : public IVFMixIndex {
|
||||
|
||||
Status
|
||||
UnsetQuantizer() override;
|
||||
std::pair<VecIndexPtr, knowhere::QuantizerPtr>
|
||||
CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg) override;
|
||||
|
||||
Status
|
||||
VecIndexPtr
|
||||
LoadData(const knowhere::QuantizerPtr& q, const Config& conf) override;
|
||||
};
|
||||
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "cache/DataObj.h"
|
||||
#include "knowhere/common/BinarySet.h"
|
||||
@ -103,9 +104,9 @@ class VecIndex : public cache::DataObj {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
virtual Status
|
||||
virtual VecIndexPtr
|
||||
LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
return Status::OK();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
virtual Status
|
||||
@ -117,6 +118,11 @@ class VecIndex : public cache::DataObj {
|
||||
UnsetQuantizer() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual std::pair<VecIndexPtr, knowhere::QuantizerPtr>
|
||||
CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg = Config()) {
|
||||
return std::make_pair(nullptr, nullptr);
|
||||
}
|
||||
////////////////
|
||||
private:
|
||||
int64_t size_ = 0;
|
||||
|
||||
@ -165,7 +165,9 @@ class ResourceMgrAdvanceTest : public testing::Test {
|
||||
SetUp() override {
|
||||
mgr1_ = std::make_shared<ResourceMgr>();
|
||||
disk_res = std::make_shared<DiskResource>("disk", 0, true, false);
|
||||
cpu_res = std::make_shared<CpuResource>("cpu", 0, true, true);
|
||||
mgr1_->Add(ResourcePtr(disk_res));
|
||||
mgr1_->Add(ResourcePtr(cpu_res));
|
||||
mgr1_->Start();
|
||||
}
|
||||
|
||||
@ -176,6 +178,7 @@ class ResourceMgrAdvanceTest : public testing::Test {
|
||||
|
||||
ResourceMgrPtr mgr1_;
|
||||
ResourcePtr disk_res;
|
||||
ResourcePtr cpu_res;
|
||||
};
|
||||
|
||||
TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {
|
||||
|
||||
@ -28,18 +28,17 @@
|
||||
#include "utils/Error.h"
|
||||
#include "wrapper/VecIndex.h"
|
||||
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
class MockVecIndex : public engine::VecIndex {
|
||||
public:
|
||||
virtual Status BuildAll(const int64_t &nb,
|
||||
const float *xb,
|
||||
const int64_t *ids,
|
||||
const engine::Config &cfg,
|
||||
const int64_t &nt = 0,
|
||||
const float *xt = nullptr) {
|
||||
virtual Status BuildAll(const int64_t& nb,
|
||||
const float* xb,
|
||||
const int64_t* ids,
|
||||
const engine::Config& cfg,
|
||||
const int64_t& nt = 0,
|
||||
const float* xt = nullptr) {
|
||||
}
|
||||
|
||||
engine::VecIndexPtr Clone() override {
|
||||
@ -54,23 +53,23 @@ class MockVecIndex : public engine::VecIndex {
|
||||
return engine::IndexType::INVALID;
|
||||
}
|
||||
|
||||
virtual Status Add(const int64_t &nb,
|
||||
const float *xb,
|
||||
const int64_t *ids,
|
||||
const engine::Config &cfg = engine::Config()) {
|
||||
virtual Status Add(const int64_t& nb,
|
||||
const float* xb,
|
||||
const int64_t* ids,
|
||||
const engine::Config& cfg = engine::Config()) {
|
||||
}
|
||||
|
||||
virtual Status Search(const int64_t &nq,
|
||||
const float *xq,
|
||||
float *dist,
|
||||
int64_t *ids,
|
||||
const engine::Config &cfg = engine::Config()) {
|
||||
virtual Status Search(const int64_t& nq,
|
||||
const float* xq,
|
||||
float* dist,
|
||||
int64_t* ids,
|
||||
const engine::Config& cfg = engine::Config()) {
|
||||
}
|
||||
|
||||
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
|
||||
engine::VecIndexPtr CopyToGpu(const int64_t& device_id, const engine::Config& cfg) override {
|
||||
}
|
||||
|
||||
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
|
||||
engine::VecIndexPtr CopyToCpu(const engine::Config& cfg) override {
|
||||
}
|
||||
|
||||
virtual int64_t Dimension() {
|
||||
@ -86,7 +85,7 @@ class MockVecIndex : public engine::VecIndex {
|
||||
return binset;
|
||||
}
|
||||
|
||||
virtual Status Load(const knowhere::BinarySet &index_binary) {
|
||||
virtual Status Load(const knowhere::BinarySet& index_binary) {
|
||||
}
|
||||
|
||||
public:
|
||||
@ -102,11 +101,13 @@ class SchedulerTest : public testing::Test {
|
||||
cache::GpuCacheMgr::GetInstance(0)->SetCapacity(cache_cap);
|
||||
cache::GpuCacheMgr::GetInstance(1)->SetCapacity(cache_cap);
|
||||
|
||||
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
|
||||
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
|
||||
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
|
||||
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
|
||||
|
||||
res_mgr_ = std::make_shared<ResourceMgr>();
|
||||
disk_resource_ = res_mgr_->Add(std::move(disk));
|
||||
cpu_resource_ = res_mgr_->Add(std::move(cpu));
|
||||
gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
|
||||
gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
|
||||
@ -127,6 +128,7 @@ class SchedulerTest : public testing::Test {
|
||||
res_mgr_->Stop();
|
||||
}
|
||||
|
||||
ResourceWPtr disk_resource_;
|
||||
ResourceWPtr cpu_resource_;
|
||||
ResourceWPtr gpu_resource_0_;
|
||||
ResourceWPtr gpu_resource_1_;
|
||||
@ -137,7 +139,7 @@ class SchedulerTest : public testing::Test {
|
||||
|
||||
void
|
||||
insert_dummy_index_into_gpu_cache(uint64_t device_id) {
|
||||
MockVecIndex *mock_index = new MockVecIndex();
|
||||
MockVecIndex* mock_index = new MockVecIndex();
|
||||
mock_index->ntotal_ = 1000;
|
||||
engine::VecIndexPtr index(mock_index);
|
||||
|
||||
@ -224,6 +226,7 @@ class SchedulerTest2 : public testing::Test {
|
||||
TearDown() override {
|
||||
scheduler_->Stop();
|
||||
res_mgr_->Stop();
|
||||
res_mgr_->Clear();
|
||||
}
|
||||
|
||||
ResourceWPtr disk_;
|
||||
@ -237,22 +240,22 @@ class SchedulerTest2 : public testing::Test {
|
||||
std::shared_ptr<Scheduler> scheduler_;
|
||||
};
|
||||
|
||||
TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) {
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>();
|
||||
dummy->location_ = "location";
|
||||
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
auto label = std::make_shared<DefaultLabel>();
|
||||
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy, label);
|
||||
task->label() = std::make_shared<SpecResLabel>(disk_);
|
||||
tasks.push_back(task);
|
||||
disk_.lock()->task_table().Put(task);
|
||||
}
|
||||
//TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) {
|
||||
// const uint64_t NUM = 2;
|
||||
// std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
// TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>();
|
||||
// dummy->location_ = "location";
|
||||
//
|
||||
// for (uint64_t i = 0; i < NUM; ++i) {
|
||||
// auto label = std::make_shared<DefaultLabel>();
|
||||
// std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy, label);
|
||||
// task->label() = std::make_shared<SpecResLabel>(disk_);
|
||||
// tasks.push_back(task);
|
||||
// disk_.lock()->task_table().Put(task);
|
||||
// }
|
||||
|
||||
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
}
|
||||
//}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user