mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Merge remote-tracking branch 'source/branch-0.4.0' into branch-0.4.0
Former-commit-id: d1768235b11458b3199b5b7beaa0836d4d7f791b
This commit is contained in:
commit
c118d15fa2
@ -17,8 +17,9 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-430 - Search no result if index created with FLAT
|
||||
- MS-443 - Create index hang again
|
||||
- MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8
|
||||
- MS-450 - server hang after run stop_server.sh
|
||||
- MS-449 - Add vectors twice success, once with ids, the other no ids
|
||||
- MS-450 - server hang after run stop_server.sh
|
||||
- MS-458 - Keep building index for one file when no gpu resource
|
||||
- MS-461 - Mysql meta unittest failed
|
||||
- MS-462 - Run milvus server twices, should display error
|
||||
- MS-463 - Search timeout
|
||||
|
||||
@ -74,6 +74,9 @@ resource_config:
|
||||
device_id: 0
|
||||
enable_loader: true
|
||||
enable_executor: true
|
||||
gpu_resource_num: 2
|
||||
pinned_memory: 300
|
||||
temp_memory: 300
|
||||
|
||||
# gtx1660:
|
||||
# type: GPU
|
||||
|
||||
@ -26,6 +26,9 @@ class KnowhereException : public std::exception {
|
||||
};
|
||||
|
||||
|
||||
#define KNOHWERE_ERROR_MSG(MSG)\
|
||||
printf("%s", KnowhereException(MSG, __PRETTY_FUNCTION__, __FILE__, __LINE__).what())
|
||||
|
||||
#define KNOWHERE_THROW_MSG(MSG)\
|
||||
do {\
|
||||
throw KnowhereException(MSG, __PRETTY_FUNCTION__, __FILE__, __LINE__);\
|
||||
|
||||
@ -8,6 +8,18 @@
|
||||
namespace zilliz {
|
||||
namespace knowhere {
|
||||
|
||||
struct Resource {
|
||||
Resource(std::shared_ptr<faiss::gpu::StandardGpuResources> &r): faiss_res(r) {
|
||||
static int64_t global_id = 0;
|
||||
id = global_id++;
|
||||
}
|
||||
|
||||
std::shared_ptr<faiss::gpu::StandardGpuResources> faiss_res;
|
||||
int64_t id;
|
||||
};
|
||||
using ResPtr = std::shared_ptr<Resource>;
|
||||
using ResWPtr = std::weak_ptr<Resource>;
|
||||
|
||||
class FaissGpuResourceMgr {
|
||||
public:
|
||||
struct DeviceParams {
|
||||
@ -17,14 +29,11 @@ class FaissGpuResourceMgr {
|
||||
};
|
||||
|
||||
public:
|
||||
using ResPtr = std::shared_ptr<faiss::gpu::StandardGpuResources>;
|
||||
using ResWPtr = std::weak_ptr<faiss::gpu::StandardGpuResources>;
|
||||
|
||||
static FaissGpuResourceMgr &
|
||||
GetInstance();
|
||||
|
||||
void
|
||||
AllocateTempMem(ResPtr &res, const int64_t& device_id, const int64_t& size);
|
||||
AllocateTempMem(ResPtr &resource, const int64_t& device_id, const int64_t& size);
|
||||
|
||||
void
|
||||
InitDevice(int64_t device_id,
|
||||
@ -32,12 +41,23 @@ class FaissGpuResourceMgr {
|
||||
int64_t temp_mem_size = 0,
|
||||
int64_t res_num = 2);
|
||||
|
||||
void InitResource();
|
||||
void
|
||||
InitResource();
|
||||
|
||||
ResPtr GetRes(const int64_t &device_id, const int64_t& alloc_size = 0);
|
||||
// allocate gpu memory invoke by build or copy_to_gpu
|
||||
ResPtr
|
||||
GetRes(const int64_t &device_id, const int64_t& alloc_size = 0);
|
||||
|
||||
void MoveToInuse(const int64_t &device_id, const ResPtr& res);
|
||||
void MoveToIdle(const int64_t &device_id, const ResPtr& res);
|
||||
// allocate gpu memory before search
|
||||
// this func will return True if the device is idle and exists an idle resource.
|
||||
bool
|
||||
GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0);
|
||||
|
||||
void
|
||||
MoveToInuse(const int64_t &device_id, const ResPtr& res);
|
||||
|
||||
void
|
||||
MoveToIdle(const int64_t &device_id, const ResPtr& res);
|
||||
|
||||
protected:
|
||||
bool is_init = false;
|
||||
@ -50,23 +70,24 @@ class FaissGpuResourceMgr {
|
||||
|
||||
class ResScope {
|
||||
public:
|
||||
ResScope(const int64_t device_id,std::shared_ptr<faiss::gpu::StandardGpuResources> &res) : resource(res), device_id(device_id) {
|
||||
ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id) {
|
||||
FaissGpuResourceMgr::GetInstance().MoveToInuse(device_id, resource);
|
||||
}
|
||||
|
||||
~ResScope() {
|
||||
resource->noTempMemory();
|
||||
//resource->faiss_res->noTempMemory();
|
||||
FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<faiss::gpu::StandardGpuResources> resource;
|
||||
ResPtr resource;
|
||||
int64_t device_id;
|
||||
};
|
||||
|
||||
class GPUIndex {
|
||||
public:
|
||||
explicit GPUIndex(const int &device_id) : gpu_id_(device_id) {};
|
||||
explicit GPUIndex(const int &device_id) : gpu_id_(device_id) {}
|
||||
GPUIndex(const int& device_id, ResPtr resource): gpu_id_(device_id), res_(std::move(resource)){}
|
||||
|
||||
virtual VectorIndexPtr CopyGpuToCpu(const Config &config) = 0;
|
||||
virtual VectorIndexPtr CopyGpuToGpu(const int64_t &device_id, const Config &config) = 0;
|
||||
@ -76,13 +97,14 @@ class GPUIndex {
|
||||
|
||||
protected:
|
||||
int64_t gpu_id_;
|
||||
ResPtr res_ = nullptr;
|
||||
};
|
||||
|
||||
class GPUIVF : public IVF, public GPUIndex {
|
||||
public:
|
||||
explicit GPUIVF(const int &device_id) : IVF(), GPUIndex(device_id) {}
|
||||
explicit GPUIVF(std::shared_ptr<faiss::Index> index, const int64_t &device_id)
|
||||
: IVF(std::move(index)), GPUIndex(device_id) {};
|
||||
explicit GPUIVF(std::shared_ptr<faiss::Index> index, const int64_t &device_id, ResPtr &resource)
|
||||
: IVF(std::move(index)), GPUIndex(device_id, resource) {};
|
||||
IndexModelPtr Train(const DatasetPtr &dataset, const Config &config) override;
|
||||
void set_index_model(IndexModelPtr model) override;
|
||||
//DatasetPtr Search(const DatasetPtr &dataset, const Config &config) override;
|
||||
@ -107,7 +129,8 @@ class GPUIVF : public IVF, public GPUIndex {
|
||||
class GPUIVFSQ : public GPUIVF {
|
||||
public:
|
||||
explicit GPUIVFSQ(const int &device_id) : GPUIVF(device_id) {}
|
||||
explicit GPUIVFSQ(std::shared_ptr<faiss::Index> index, const int64_t& device_id) : GPUIVF(std::move(index),device_id) {};
|
||||
explicit GPUIVFSQ(std::shared_ptr<faiss::Index> index, const int64_t &device_id, ResPtr &resource)
|
||||
: GPUIVF(std::move(index), device_id, resource) {};
|
||||
IndexModelPtr Train(const DatasetPtr &dataset, const Config &config) override;
|
||||
|
||||
public:
|
||||
|
||||
@ -39,8 +39,8 @@ using IDMAPPtr = std::shared_ptr<IDMAP>;
|
||||
|
||||
class GPUIDMAP : public IDMAP, public GPUIndex {
|
||||
public:
|
||||
explicit GPUIDMAP(std::shared_ptr<faiss::Index> index, const int64_t &device_id)
|
||||
: IDMAP(std::move(index)), GPUIndex(device_id) {}
|
||||
explicit GPUIDMAP(std::shared_ptr<faiss::Index> index, const int64_t &device_id, ResPtr& res)
|
||||
: IDMAP(std::move(index)), GPUIndex(device_id, res) {}
|
||||
|
||||
VectorIndexPtr CopyGpuToCpu(const Config &config) override;
|
||||
float *GetRawVectors() override;
|
||||
|
||||
@ -31,26 +31,31 @@ IndexModelPtr GPUIVF::Train(const DatasetPtr &dataset, const Config &config) {
|
||||
|
||||
GETTENSOR(dataset)
|
||||
|
||||
auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_device);
|
||||
ResScope rs(gpu_device, res);
|
||||
faiss::gpu::GpuIndexIVFFlatConfig idx_config;
|
||||
idx_config.device = gpu_device;
|
||||
faiss::gpu::GpuIndexIVFFlat device_index(res.get(), dim, nlist, metric_type, idx_config);
|
||||
device_index.train(rows, (float *) p_data);
|
||||
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_device);
|
||||
if (temp_resource != nullptr) {
|
||||
ResScope rs(gpu_device, temp_resource );
|
||||
faiss::gpu::GpuIndexIVFFlatConfig idx_config;
|
||||
idx_config.device = gpu_device;
|
||||
faiss::gpu::GpuIndexIVFFlat device_index(temp_resource->faiss_res.get(), dim, nlist, metric_type, idx_config);
|
||||
device_index.train(rows, (float *) p_data);
|
||||
|
||||
std::shared_ptr<faiss::Index> host_index = nullptr;
|
||||
host_index.reset(faiss::gpu::index_gpu_to_cpu(&device_index));
|
||||
std::shared_ptr<faiss::Index> host_index = nullptr;
|
||||
host_index.reset(faiss::gpu::index_gpu_to_cpu(&device_index));
|
||||
|
||||
return std::make_shared<IVFIndexModel>(host_index);
|
||||
return std::make_shared<IVFIndexModel>(host_index);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("Build IVF can't get gpu resource");
|
||||
}
|
||||
}
|
||||
|
||||
void GPUIVF::set_index_model(IndexModelPtr model) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
auto host_index = std::static_pointer_cast<IVFIndexModel>(model);
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(gpu_id_, res);
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(res.get(), gpu_id_, host_index->index_.get());
|
||||
if (auto gpures = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(gpu_id_, gpures);
|
||||
res_ = gpures;
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, host_index->index_.get());
|
||||
index_.reset(device_index);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("load index model error, can't get gpu_resource");
|
||||
@ -94,9 +99,10 @@ void GPUIVF::LoadImpl(const BinarySet &index_binary) {
|
||||
|
||||
faiss::Index *index = faiss::read_index(&reader);
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(gpu_id_, res);
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(res.get(), gpu_id_, index);
|
||||
if (auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(gpu_id_, temp_res);
|
||||
res_ = temp_res;
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, index);
|
||||
index_.reset(device_index);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("Load error, can't get gpu resource");
|
||||
@ -123,14 +129,21 @@ void GPUIVF::search_impl(int64_t n,
|
||||
float *distances,
|
||||
int64_t *labels,
|
||||
const Config &cfg) {
|
||||
if (auto device_index = std::static_pointer_cast<faiss::gpu::GpuIndexIVF>(index_)) {
|
||||
// todo: allocate search memory
|
||||
auto nprobe = cfg.get_with_default("nprobe", size_t(1));
|
||||
// TODO(linxj): allocate mem
|
||||
if (FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_, res_)) {
|
||||
ResScope rs(gpu_id_, res_);
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
device_index->setNumProbes(nprobe);
|
||||
device_index->search(n, (float *) data, k, distances, labels);
|
||||
if (auto device_index = std::static_pointer_cast<faiss::gpu::GpuIndexIVF>(index_)) {
|
||||
auto nprobe = cfg.get_with_default("nprobe", size_t(1));
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
device_index->setNumProbes(nprobe);
|
||||
device_index->search(n, (float *) data, k, distances, labels);
|
||||
}
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("search can't get gpu resource");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
VectorIndexPtr GPUIVF::CopyGpuToCpu(const Config &config) {
|
||||
@ -165,6 +178,7 @@ IndexModelPtr GPUIVFPQ::Train(const DatasetPtr &dataset, const Config &config) {
|
||||
GETTENSOR(dataset)
|
||||
|
||||
// TODO(linxj): set device here.
|
||||
// TODO(linxj): set gpu resource here.
|
||||
faiss::gpu::StandardGpuResources res;
|
||||
faiss::gpu::GpuIndexIVFPQ device_index(&res, dim, nlist, M, nbits, metric_type);
|
||||
device_index.train(rows, (float *) p_data);
|
||||
@ -202,17 +216,23 @@ IndexModelPtr GPUIVFSQ::Train(const DatasetPtr &dataset, const Config &config) {
|
||||
index_type << "IVF" << nlist << "," << "SQ" << nbits;
|
||||
auto build_index = faiss::index_factory(dim, index_type.str().c_str(), metric_type);
|
||||
|
||||
faiss::gpu::StandardGpuResources res;
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(&res, gpu_num, build_index);
|
||||
device_index->train(rows, (float *) p_data);
|
||||
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_num);
|
||||
if (temp_resource != nullptr) {
|
||||
ResScope rs(gpu_num, temp_resource );
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(temp_resource->faiss_res.get(), gpu_num, build_index);
|
||||
device_index->train(rows, (float *) p_data);
|
||||
|
||||
std::shared_ptr<faiss::Index> host_index = nullptr;
|
||||
host_index.reset(faiss::gpu::index_gpu_to_cpu(device_index));
|
||||
std::shared_ptr<faiss::Index> host_index = nullptr;
|
||||
host_index.reset(faiss::gpu::index_gpu_to_cpu(device_index));
|
||||
|
||||
delete device_index;
|
||||
delete build_index;
|
||||
delete device_index;
|
||||
delete build_index;
|
||||
|
||||
return std::make_shared<IVFIndexModel>(host_index);
|
||||
return std::make_shared<IVFIndexModel>(host_index);
|
||||
}
|
||||
else {
|
||||
KNOWHERE_THROW_MSG("Build IVFSQ can't get gpu resource");
|
||||
}
|
||||
}
|
||||
|
||||
VectorIndexPtr GPUIVFSQ::CopyGpuToCpu(const Config &config) {
|
||||
@ -231,16 +251,16 @@ FaissGpuResourceMgr &FaissGpuResourceMgr::GetInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
void FaissGpuResourceMgr::AllocateTempMem(std::shared_ptr<faiss::gpu::StandardGpuResources> &res,
|
||||
void FaissGpuResourceMgr::AllocateTempMem(ResPtr &resource,
|
||||
const int64_t &device_id,
|
||||
const int64_t &size) {
|
||||
if (size) {
|
||||
res->setTempMemory(size);
|
||||
resource->faiss_res->setTempMemory(size);
|
||||
}
|
||||
else {
|
||||
auto search = devices_params_.find(device_id);
|
||||
if (search != devices_params_.end()) {
|
||||
res->setTempMemory(search->second.temp_mem_size);
|
||||
resource->faiss_res->setTempMemory(search->second.temp_mem_size);
|
||||
}
|
||||
// else do nothing. allocate when use.
|
||||
}
|
||||
@ -264,14 +284,19 @@ void FaissGpuResourceMgr::InitResource() {
|
||||
|
||||
for (int i = 0; i < device.second.resource_num; ++i) {
|
||||
auto res = std::make_shared<faiss::gpu::StandardGpuResources>();
|
||||
res->noTempMemory();
|
||||
resource_vec.push_back(res);
|
||||
|
||||
// TODO(linxj): enable set pinned memory
|
||||
//res->noTempMemory();
|
||||
auto res_wrapper = std::make_shared<Resource>(res);
|
||||
AllocateTempMem(res_wrapper, device.first, 0);
|
||||
|
||||
resource_vec.emplace_back(res_wrapper);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<faiss::gpu::StandardGpuResources> FaissGpuResourceMgr::GetRes(const int64_t &device_id,
|
||||
const int64_t &alloc_size) {
|
||||
ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
|
||||
const int64_t &alloc_size) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
if (!is_init) {
|
||||
@ -282,21 +307,48 @@ std::shared_ptr<faiss::gpu::StandardGpuResources> FaissGpuResourceMgr::GetRes(co
|
||||
auto search = idle_.find(device_id);
|
||||
if (search != idle_.end()) {
|
||||
auto res = search->second.back();
|
||||
AllocateTempMem(res, device_id, alloc_size);
|
||||
//AllocateTempMem(res, device_id, alloc_size);
|
||||
|
||||
search->second.pop_back();
|
||||
return res;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void FaissGpuResourceMgr::MoveToInuse(const int64_t &device_id, const std::shared_ptr<faiss::gpu::StandardGpuResources> &res) {
|
||||
bool FaissGpuResourceMgr::GetRes(const int64_t &device_id,
|
||||
ResPtr &res,
|
||||
const int64_t &alloc_size) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
if (!is_init) {
|
||||
InitResource();
|
||||
is_init = true;
|
||||
}
|
||||
|
||||
auto search = idle_.find(device_id);
|
||||
if (search != idle_.end()) {
|
||||
auto &res_vec = search->second;
|
||||
for (auto it = res_vec.cbegin(); it != res_vec.cend(); ++it) {
|
||||
if ((*it)->id == res->id) {
|
||||
//AllocateTempMem(res, device_id, alloc_size);
|
||||
res_vec.erase(it);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// else
|
||||
return false;
|
||||
}
|
||||
|
||||
void FaissGpuResourceMgr::MoveToInuse(const int64_t &device_id, const ResPtr &res) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
in_use_[device_id].push_back(res);
|
||||
}
|
||||
|
||||
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const std::shared_ptr<faiss::gpu::StandardGpuResources> &res) {
|
||||
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
idle_[device_id].push_back(res);
|
||||
auto it = idle_[device_id].begin();
|
||||
idle_[device_id].insert(it, res);
|
||||
}
|
||||
|
||||
void GPUIndex::SetGpuDevice(const int &gpu_id) {
|
||||
|
||||
@ -135,11 +135,11 @@ VectorIndexPtr IDMAP::Clone() {
|
||||
VectorIndexPtr IDMAP::CopyCpuToGpu(const int64_t &device_id, const Config &config) {
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
|
||||
ResScope rs(device_id, res);
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res.get(), device_id, index_.get());
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get());
|
||||
|
||||
std::shared_ptr<faiss::Index> device_index;
|
||||
device_index.reset(gpu_index);
|
||||
return std::make_shared<GPUIDMAP>(device_index, device_id);
|
||||
return std::make_shared<GPUIDMAP>(device_index, device_id, res);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
|
||||
}
|
||||
@ -204,7 +204,7 @@ void GPUIDMAP::LoadImpl(const BinarySet &index_binary) {
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_) ){
|
||||
ResScope rs(gpu_id_, res);
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(res.get(), gpu_id_, index);
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index);
|
||||
index_.reset(device_index);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("Load error, can't get gpu resource");
|
||||
|
||||
@ -197,11 +197,11 @@ void IVF::search_impl(int64_t n,
|
||||
VectorIndexPtr IVF::CopyCpuToGpu(const int64_t& device_id, const Config &config) {
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
|
||||
ResScope rs(device_id, res);
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res.get(), device_id, index_.get());
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get());
|
||||
|
||||
std::shared_ptr<faiss::Index> device_index;
|
||||
device_index.reset(gpu_index);
|
||||
return std::make_shared<GPUIVF>(device_index, device_id);
|
||||
return std::make_shared<GPUIVF>(device_index, device_id, res);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
|
||||
}
|
||||
@ -275,11 +275,11 @@ VectorIndexPtr IVFSQ::CopyCpuToGpu(const int64_t &device_id, const Config &confi
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
option.allInGpu = true;
|
||||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res.get(), device_id, index_.get(), &option);
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get(), &option);
|
||||
|
||||
std::shared_ptr<faiss::Index> device_index;
|
||||
device_index.reset(gpu_index);
|
||||
return std::make_shared<GPUIVFSQ>(device_index, device_id);
|
||||
return std::make_shared<GPUIVFSQ>(device_index, device_id, res);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource");
|
||||
}
|
||||
@ -350,12 +350,16 @@ void BasicIndex::LoadImpl(const BinarySet &index_binary) {
|
||||
}
|
||||
|
||||
void BasicIndex::SealImpl() {
|
||||
// TODO(linxj): enable
|
||||
//#ifdef ZILLIZ_FAISS
|
||||
faiss::Index *index = index_.get();
|
||||
auto idx = dynamic_cast<faiss::IndexIVF *>(index);
|
||||
if (idx != nullptr) {
|
||||
idx->to_readonly();
|
||||
}
|
||||
//else {
|
||||
// KNOHWERE_ERROR_MSG("Seal failed");
|
||||
//}
|
||||
//#endif
|
||||
}
|
||||
|
||||
|
||||
@ -18,9 +18,11 @@
|
||||
|
||||
using namespace zilliz::knowhere;
|
||||
|
||||
static int device_id = 0;
|
||||
class IDMAPTest : public DataGen, public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {
|
||||
FaissGpuResourceMgr::GetInstance().InitDevice(device_id, 1024*1024*200, 1024*1024*300, 2);
|
||||
Init_with_default();
|
||||
index_ = std::make_shared<IDMAP>();
|
||||
}
|
||||
|
||||
@ -8,6 +8,12 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
|
||||
#include <faiss/AutoTune.h>
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/gpu/GpuIndexIVFFlat.h>
|
||||
#include <faiss/gpu/GpuClonerOptions.h>
|
||||
|
||||
#include "knowhere/index/vector_index/gpu_ivf.h"
|
||||
#include "knowhere/index/vector_index/ivf.h"
|
||||
@ -25,7 +31,7 @@ using ::testing::TestWithParam;
|
||||
using ::testing::Values;
|
||||
using ::testing::Combine;
|
||||
|
||||
static int device_id = 1;
|
||||
static int device_id = 0;
|
||||
IVFIndexPtr IndexFactory(const std::string &type) {
|
||||
if (type == "IVF") {
|
||||
return std::make_shared<IVF>();
|
||||
@ -50,7 +56,7 @@ class IVFTest
|
||||
//Init_with_default();
|
||||
Generate(128, 1000000/5, 10);
|
||||
index_ = IndexFactory(index_type);
|
||||
FaissGpuResourceMgr::GetInstance().InitDevice(device_id);
|
||||
FaissGpuResourceMgr::GetInstance().InitDevice(device_id, 1024*1024*200, 1024*1024*300, 2);
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -343,4 +349,213 @@ TEST_P(IVFTest, seal_test) {
|
||||
ASSERT_GE(without_seal, with_seal);
|
||||
}
|
||||
|
||||
|
||||
class GPURESTEST
|
||||
: public DataGen, public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {
|
||||
//std::tie(index_type, preprocess_cfg, train_cfg, add_cfg, search_cfg) = GetParam();
|
||||
//Init_with_default();
|
||||
Generate(128, 1000000, 1000);
|
||||
k = 100;
|
||||
//index_ = IndexFactory(index_type);
|
||||
FaissGpuResourceMgr::GetInstance().InitDevice(device_id, 1024*1024*200, 1024*1024*300, 2);
|
||||
|
||||
elems = nq * k;
|
||||
ids = (int64_t *) malloc(sizeof(int64_t) * elems);
|
||||
dis = (float *) malloc(sizeof(float) * elems);
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
delete ids;
|
||||
delete dis;
|
||||
}
|
||||
|
||||
protected:
|
||||
std::string index_type;
|
||||
Config preprocess_cfg;
|
||||
Config train_cfg;
|
||||
Config add_cfg;
|
||||
Config search_cfg;
|
||||
IVFIndexPtr index_ = nullptr;
|
||||
|
||||
int64_t *ids = nullptr;
|
||||
float *dis = nullptr;
|
||||
int64_t elems = 0;
|
||||
};
|
||||
|
||||
const int search_count = 100;
|
||||
const int load_count = 30;
|
||||
|
||||
TEST_F(GPURESTEST, gpu_ivf_resource_test) {
|
||||
assert(!xb.empty());
|
||||
|
||||
|
||||
{
|
||||
index_type = "GPUIVF";
|
||||
index_ = IndexFactory(index_type);
|
||||
auto preprocessor = index_->BuildPreprocessor(base_dataset, preprocess_cfg);
|
||||
index_->set_preprocessor(preprocessor);
|
||||
train_cfg = Config::object{{"nlist", 1638}, {"gpu_id", device_id}, {"metric_type", "L2"}};
|
||||
auto model = index_->Train(base_dataset, train_cfg);
|
||||
index_->set_index_model(model);
|
||||
index_->Add(base_dataset, add_cfg);
|
||||
EXPECT_EQ(index_->Count(), nb);
|
||||
EXPECT_EQ(index_->Dimension(), dim);
|
||||
|
||||
search_cfg = Config::object{{"k", k}};
|
||||
TimeRecorder tc("knowere GPUIVF");
|
||||
for (int i = 0; i < search_count; ++i) {
|
||||
index_->Search(query_dataset, search_cfg);
|
||||
if (i > search_count - 6 || i < 5)
|
||||
tc.RecordSection("search once");
|
||||
}
|
||||
tc.RecordSection("search all");
|
||||
}
|
||||
|
||||
{
|
||||
// IVF-Search
|
||||
faiss::gpu::StandardGpuResources res;
|
||||
faiss::gpu::GpuIndexIVFFlatConfig idx_config;
|
||||
idx_config.device = device_id;
|
||||
faiss::gpu::GpuIndexIVFFlat device_index(&res, dim, 1638, faiss::METRIC_L2, idx_config);
|
||||
device_index.train(nb, xb.data());
|
||||
device_index.add(nb, xb.data());
|
||||
|
||||
TimeRecorder tc("ori IVF");
|
||||
for (int i = 0; i < search_count; ++i) {
|
||||
device_index.search(nq, xq.data(), k, dis, ids);
|
||||
if (i > search_count - 6 || i < 5)
|
||||
tc.RecordSection("search once");
|
||||
}
|
||||
tc.RecordSection("search all");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_F(GPURESTEST, gpuivfsq) {
|
||||
{
|
||||
// knowhere gpu ivfsq
|
||||
index_type = "GPUIVFSQ";
|
||||
index_ = IndexFactory(index_type);
|
||||
auto preprocessor = index_->BuildPreprocessor(base_dataset, preprocess_cfg);
|
||||
index_->set_preprocessor(preprocessor);
|
||||
train_cfg = Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}};
|
||||
auto model = index_->Train(base_dataset, train_cfg);
|
||||
index_->set_index_model(model);
|
||||
index_->Add(base_dataset, add_cfg);
|
||||
search_cfg = Config::object{{"k", k}};
|
||||
auto result = index_->Search(query_dataset, search_cfg);
|
||||
AssertAnns(result, nq, k);
|
||||
|
||||
auto cpu_idx = CopyGpuToCpu(index_, Config());
|
||||
cpu_idx->Seal();
|
||||
|
||||
TimeRecorder tc("knowhere GPUSQ8");
|
||||
auto search_idx = CopyCpuToGpu(cpu_idx, device_id, Config());
|
||||
tc.RecordSection("Copy to gpu");
|
||||
for (int i = 0; i < search_count; ++i) {
|
||||
search_idx->Search(query_dataset, search_cfg);
|
||||
if (i > search_count - 6 || i < 5)
|
||||
tc.RecordSection("search once");
|
||||
}
|
||||
tc.RecordSection("search all");
|
||||
}
|
||||
|
||||
{
|
||||
// Ori gpuivfsq Test
|
||||
const char *index_description = "IVF1638,SQ8";
|
||||
faiss::Index *ori_index = faiss::index_factory(dim, index_description, faiss::METRIC_L2);
|
||||
|
||||
faiss::gpu::StandardGpuResources res;
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(&res, device_id, ori_index);
|
||||
device_index->train(nb, xb.data());
|
||||
device_index->add(nb, xb.data());
|
||||
|
||||
auto cpu_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
auto idx = dynamic_cast<faiss::IndexIVF *>(cpu_index);
|
||||
if (idx != nullptr) {
|
||||
idx->to_readonly();
|
||||
}
|
||||
delete device_index;
|
||||
delete ori_index;
|
||||
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
option.allInGpu = true;
|
||||
|
||||
TimeRecorder tc("ori GPUSQ8");
|
||||
faiss::Index *search_idx = faiss::gpu::index_cpu_to_gpu(&res, device_id, cpu_index, &option);
|
||||
tc.RecordSection("Copy to gpu");
|
||||
for (int i = 0; i < search_count; ++i) {
|
||||
search_idx->search(nq, xq.data(), k, dis, ids);
|
||||
if (i > search_count - 6 || i < 5)
|
||||
tc.RecordSection("search once");
|
||||
}
|
||||
tc.RecordSection("search all");
|
||||
delete cpu_index;
|
||||
delete search_idx;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_F(GPURESTEST, copyandsearch) {
|
||||
printf("==================\n");
|
||||
|
||||
// search and copy at the same time
|
||||
index_type = "GPUIVFSQ";
|
||||
//index_type = "GPUIVF";
|
||||
index_ = IndexFactory(index_type);
|
||||
auto preprocessor = index_->BuildPreprocessor(base_dataset, preprocess_cfg);
|
||||
index_->set_preprocessor(preprocessor);
|
||||
train_cfg = Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}};
|
||||
auto model = index_->Train(base_dataset, train_cfg);
|
||||
index_->set_index_model(model);
|
||||
index_->Add(base_dataset, add_cfg);
|
||||
search_cfg = Config::object{{"k", k}};
|
||||
auto result = index_->Search(query_dataset, search_cfg);
|
||||
AssertAnns(result, nq, k);
|
||||
|
||||
auto cpu_idx = CopyGpuToCpu(index_, Config());
|
||||
cpu_idx->Seal();
|
||||
|
||||
auto search_idx = CopyCpuToGpu(cpu_idx, device_id, Config());
|
||||
|
||||
auto search_func = [&] {
|
||||
//TimeRecorder tc("search&load");
|
||||
for (int i = 0; i < search_count; ++i) {
|
||||
search_idx->Search(query_dataset, search_cfg);
|
||||
//if (i > search_count - 6 || i == 0)
|
||||
// tc.RecordSection("search once");
|
||||
}
|
||||
//tc.ElapseFromBegin("search finish");
|
||||
};
|
||||
auto load_func = [&] {
|
||||
//TimeRecorder tc("search&load");
|
||||
for (int i = 0; i < load_count; ++i) {
|
||||
CopyCpuToGpu(cpu_idx, device_id, Config());
|
||||
//if (i > load_count -5 || i < 5)
|
||||
//tc.RecordSection("Copy to gpu");
|
||||
}
|
||||
//tc.ElapseFromBegin("load finish");
|
||||
};
|
||||
|
||||
TimeRecorder tc("basic");
|
||||
CopyCpuToGpu(cpu_idx, device_id, Config());
|
||||
tc.RecordSection("Copy to gpu once");
|
||||
search_idx->Search(query_dataset, search_cfg);
|
||||
tc.RecordSection("search once");
|
||||
search_func();
|
||||
tc.RecordSection("only search total");
|
||||
load_func();
|
||||
tc.RecordSection("only copy total");
|
||||
|
||||
std::thread search_thread(search_func);
|
||||
std::thread load_thread(load_func);
|
||||
search_thread.join();
|
||||
load_thread.join();
|
||||
tc.RecordSection("Copy&search total");
|
||||
}
|
||||
|
||||
|
||||
|
||||
// TODO(linxj): Add exception test
|
||||
|
||||
@ -657,7 +657,6 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
|
||||
status = BackgroundMergeFiles(table_id);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
|
||||
continue;//let other table get chance to merge
|
||||
}
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)){
|
||||
@ -830,7 +829,6 @@ void DBImpl::BackgroundBuildIndex() {
|
||||
status = BuildIndex(file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
|
||||
return;
|
||||
}
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)){
|
||||
|
||||
@ -31,6 +31,9 @@ StartSchedulerService() {
|
||||
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
|
||||
auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
|
||||
auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
|
||||
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
|
||||
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
|
||||
auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM);
|
||||
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
|
||||
type,
|
||||
@ -38,7 +41,9 @@ StartSchedulerService() {
|
||||
enable_loader,
|
||||
enable_executor));
|
||||
|
||||
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id);
|
||||
pinned_memory = 1024 * 1024 * pinned_memory;
|
||||
temp_memory = 1024 * 1024 * temp_memory;
|
||||
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id, pinned_memory, temp_memory, resource_num);
|
||||
}
|
||||
|
||||
knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
|
||||
|
||||
@ -56,6 +56,9 @@ static const char* CONFIG_RESOURCE_MEMORY = "memory";
|
||||
static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id";
|
||||
static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader";
|
||||
static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor";
|
||||
static const char* CONFIG_RESOURCE_NUM = "gpu_resource_num";
|
||||
static const char* CONFIG_RESOURCE_PIN_MEMORY = "pinned_memory";
|
||||
static const char* CONFIG_RESOURCE_TEMP_MEMORY = "temp_memory";
|
||||
static const char* CONFIG_RESOURCE_CONNECTIONS = "connections";
|
||||
static const char* CONFIG_SPEED_CONNECTIONS = "speed";
|
||||
static const char* CONFIG_ENDPOINT_CONNECTIONS = "endpoint";
|
||||
|
||||
@ -29,7 +29,7 @@ class KnowhereWrapperTest
|
||||
: public TestWithParam<::std::tuple<IndexType, std::string, int, int, int, int, Config, Config>> {
|
||||
protected:
|
||||
void SetUp() override {
|
||||
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID);
|
||||
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024*1024*200, 1024*1024*300, 2);
|
||||
|
||||
std::string generator_type;
|
||||
std::tie(index_type, generator_type, dim, nb, nq, k, train_cfg, search_cfg) = GetParam();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user