mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Merge remote-tracking branch 'main/0.5.1' into 0.5.1
Former-commit-id: 6f882df8f6a9571e9c5bf6aab55a813dcfe9cbad
This commit is contained in:
commit
881e962361
@ -5,8 +5,11 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
# Milvus 0.5.1 (TODO)
|
||||
|
||||
## Bug
|
||||
|
||||
## Feature
|
||||
- \#90 - The server start error messages could be improved to enhance user experience
|
||||
- \#104 - test_scheduler core dump
|
||||
- \#115 - Using new structure for tasktable
|
||||
|
||||
## Improvement
|
||||
- \#64 - Improvement dump function in scheduler
|
||||
@ -16,9 +19,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- \#96 - Remove .a file in milvus/lib for docker-version
|
||||
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
|
||||
- \#122 - Add unique id for Job
|
||||
|
||||
## Feature
|
||||
- \#115 - Using new structure for tasktable
|
||||
- \#130 - Set task state MOVED after resource copy it completed
|
||||
|
||||
## Task
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@ container('milvus-build-env') {
|
||||
sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' \
|
||||
&& export JFROG_USER_NAME='${USERNAME}' \
|
||||
&& export JFROG_PASSWORD='${PASSWORD}' \
|
||||
&& export FAISS_URL='http://192.168.1.105:6060/jinhai/faiss/-/archive/branch-0.2.1/faiss-branch-0.2.1.tar.gz' \
|
||||
&& export FAISS_URL='http://192.168.1.105:6060/jinhai/faiss/-/archive/branch-0.3.0/faiss-branch-0.3.0.tar.gz' \
|
||||
&& ./build.sh -t ${params.BUILD_TYPE} -d /opt/milvus -j -u -c"
|
||||
|
||||
sh "./coverage.sh -u root -p 123456 -t \$POD_IP"
|
||||
|
||||
@ -14,7 +14,7 @@ container('milvus-build-env') {
|
||||
sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' \
|
||||
&& export JFROG_USER_NAME='${USERNAME}' \
|
||||
&& export JFROG_PASSWORD='${PASSWORD}' \
|
||||
&& export FAISS_URL='http://192.168.1.105:6060/jinhai/faiss/-/archive/branch-0.2.1/faiss-branch-0.2.1.tar.gz' \
|
||||
&& export FAISS_URL='http://192.168.1.105:6060/jinhai/faiss/-/archive/branch-0.3.0/faiss-branch-0.3.0.tar.gz' \
|
||||
&& ./build.sh -t ${params.BUILD_TYPE} -j -d /opt/milvus"
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,16 +18,13 @@
|
||||
#include "db/engine/ExecutionEngineImpl.h"
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "cache/GpuCacheMgr.h"
|
||||
#include "knowhere/common/Config.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "scheduler/Utils.h"
|
||||
#include "server/Config.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
#include "knowhere/common/Config.h"
|
||||
#include "knowhere/common/Exception.h"
|
||||
#include "knowhere/index/vector_index/IndexIVFSQHybrid.h"
|
||||
#include "scheduler/Utils.h"
|
||||
#include "server/Config.h"
|
||||
#include "wrapper/ConfAdapter.h"
|
||||
#include "wrapper/ConfAdapterMgr.h"
|
||||
#include "wrapper/VecImpl.h"
|
||||
@ -260,6 +257,54 @@ ExecutionEngineImpl::Load(bool to_cache) {
|
||||
Status
|
||||
ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
|
||||
if (hybrid) {
|
||||
const std::string key = location_ + ".quantizer";
|
||||
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
|
||||
|
||||
const int64_t NOT_FOUND = -1;
|
||||
int64_t device_id = NOT_FOUND;
|
||||
|
||||
// cache hit
|
||||
{
|
||||
knowhere::QuantizerPtr quantizer = nullptr;
|
||||
|
||||
for (auto& gpu : gpus) {
|
||||
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
if (auto cached_quantizer = cache->GetIndex(key)) {
|
||||
device_id = gpu;
|
||||
quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
|
||||
}
|
||||
}
|
||||
|
||||
if (device_id != NOT_FOUND) {
|
||||
// cache hit
|
||||
auto config = std::make_shared<knowhere::QuantizerCfg>();
|
||||
config->gpu_id = device_id;
|
||||
config->mode = 2;
|
||||
auto new_index = index_->LoadData(quantizer, config);
|
||||
index_ = new_index;
|
||||
}
|
||||
}
|
||||
|
||||
if (device_id == NOT_FOUND) {
|
||||
// cache miss
|
||||
std::vector<int64_t> all_free_mem;
|
||||
for (auto& gpu : gpus) {
|
||||
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
|
||||
all_free_mem.push_back(free_mem);
|
||||
}
|
||||
|
||||
auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
|
||||
auto best_index = std::distance(all_free_mem.begin(), max_e);
|
||||
device_id = gpus[best_index];
|
||||
|
||||
auto pair = index_->CopyToGpuWithQuantizer(device_id);
|
||||
index_ = pair.first;
|
||||
|
||||
// cache
|
||||
auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
|
||||
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -244,11 +244,12 @@ if(CUSTOMIZATION)
|
||||
# set(FAISS_MD5 "21deb1c708490ca40ecb899122c01403") # commit-id 643e48f479637fd947e7b93fa4ca72b38ecc9a39 branch-0.2.0
|
||||
# set(FAISS_MD5 "072db398351cca6e88f52d743bbb9fa0") # commit-id 3a2344d04744166af41ef1a74449d68a315bfe17 branch-0.2.1
|
||||
# set(FAISS_MD5 "c89ea8e655f5cdf58f42486f13614714") # commit-id 9c28a1cbb88f41fa03b03d7204106201ad33276b branch-0.2.1
|
||||
set(FAISS_MD5 "87fdd86351ffcaf3f80dc26ade63c44b") # commit-id 841a156e67e8e22cd8088e1b58c00afbf2efc30b branch-0.2.1
|
||||
# set(FAISS_MD5 "87fdd86351ffcaf3f80dc26ade63c44b") # commit-id 841a156e67e8e22cd8088e1b58c00afbf2efc30b branch-0.2.1
|
||||
set(FAISS_MD5 "f3b2ce3364c3fa7febd3aa7fdd0fe380") # commit-id 694e03458e6b69ce8a62502f71f69a614af5af8f branch-0.3.0
|
||||
endif()
|
||||
else()
|
||||
set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/v1.5.3.tar.gz")
|
||||
set(FAISS_MD5 "0bc12737b23def156f6a1eb782050135")
|
||||
set(FAISS_SOURCE_URL "https://github.com/milvus-io/faiss/archive/1.6.0.tar.gz")
|
||||
set(FAISS_MD5 "eb96d84f98b078a9eec04a796f5c792e")
|
||||
endif()
|
||||
message(STATUS "FAISS URL = ${FAISS_SOURCE_URL}")
|
||||
|
||||
|
||||
@ -38,7 +38,7 @@ class FaissBaseIndex {
|
||||
virtual void
|
||||
SealImpl();
|
||||
|
||||
protected:
|
||||
public:
|
||||
std::shared_ptr<faiss::Index> index_ = nullptr;
|
||||
};
|
||||
|
||||
|
||||
@ -15,12 +15,12 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/gpu/GpuIndexFlat.h>
|
||||
#include <memory>
|
||||
|
||||
#include <faiss/gpu/GpuCloner.h>
|
||||
#include <faiss/gpu/GpuIndexIVF.h>
|
||||
#include <faiss/gpu/GpuIndexIVFFlat.h>
|
||||
#include <faiss/index_io.h>
|
||||
#include <memory>
|
||||
|
||||
#include "knowhere/adapter/VectorAdapter.h"
|
||||
#include "knowhere/common/Exception.h"
|
||||
@ -86,7 +86,8 @@ GPUIVF::SerializeImpl() {
|
||||
faiss::Index* index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(index);
|
||||
|
||||
SealImpl();
|
||||
// TODO(linxj): support seal
|
||||
// SealImpl();
|
||||
|
||||
faiss::write_index(host_index, &writer);
|
||||
delete host_index;
|
||||
@ -130,13 +131,12 @@ void
|
||||
GPUIVF::search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
// TODO(linxj): gpu index support GenParams
|
||||
if (auto device_index = std::dynamic_pointer_cast<faiss::gpu::GpuIndexIVF>(index_)) {
|
||||
auto search_cfg = std::dynamic_pointer_cast<IVFCfg>(cfg);
|
||||
device_index->setNumProbes(search_cfg->nprobe);
|
||||
device_index->nprobe = search_cfg->nprobe;
|
||||
// assert(device_index->getNumProbes() == search_cfg->nprobe);
|
||||
|
||||
{
|
||||
// TODO(linxj): allocate gpu mem
|
||||
ResScope rs(res_, gpu_id_);
|
||||
device_index->search(n, (float*)data, k, distances, labels);
|
||||
}
|
||||
|
||||
@ -16,8 +16,10 @@
|
||||
// under the License.
|
||||
|
||||
#include <faiss/IndexIVFPQ.h>
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/gpu/GpuCloner.h>
|
||||
#include <faiss/gpu/GpuIndexIVFPQ.h>
|
||||
#include <faiss/index_factory.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "knowhere/adapter/VectorAdapter.h"
|
||||
|
||||
@ -15,9 +15,10 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/gpu/GpuCloner.h>
|
||||
#include <faiss/index_factory.h>
|
||||
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include "knowhere/adapter/VectorAdapter.h"
|
||||
#include "knowhere/common/Exception.h"
|
||||
@ -71,13 +72,4 @@ GPUIVFSQ::CopyGpuToCpu(const Config& config) {
|
||||
return std::make_shared<IVFSQ>(new_index);
|
||||
}
|
||||
|
||||
void
|
||||
GPUIVFSQ::search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) {
|
||||
#ifdef CUSTOMIZATION
|
||||
GPUIVF::search_impl(n, data, k, distances, labels, cfg);
|
||||
#else
|
||||
IVF::search_impl(n, data, k, distances, labels, cfg);
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace knowhere
|
||||
|
||||
@ -38,10 +38,6 @@ class GPUIVFSQ : public GPUIVF {
|
||||
|
||||
VectorIndexPtr
|
||||
CopyGpuToCpu(const Config& config) override;
|
||||
|
||||
protected:
|
||||
void
|
||||
search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) override;
|
||||
};
|
||||
|
||||
} // namespace knowhere
|
||||
|
||||
@ -15,11 +15,12 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <faiss/AutoTune.h>
|
||||
#include <faiss/IndexFlat.h>
|
||||
#include <faiss/MetaIndexes.h>
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/gpu/GpuCloner.h>
|
||||
#include <faiss/index_factory.h>
|
||||
#include <faiss/index_io.h>
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "knowhere/adapter/VectorAdapter.h"
|
||||
|
||||
@ -15,15 +15,12 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <faiss/AutoTune.h>
|
||||
#include <faiss/AuxIndexStructures.h>
|
||||
#include <faiss/IVFlib.h>
|
||||
#include <faiss/IndexFlat.h>
|
||||
#include <faiss/IndexIVF.h>
|
||||
#include <faiss/IndexIVFFlat.h>
|
||||
#include <faiss/IndexIVFPQ.h>
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/index_io.h>
|
||||
#include <faiss/gpu/GpuCloner.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
@ -30,7 +30,7 @@ namespace knowhere {
|
||||
|
||||
using Graph = std::vector<std::vector<int64_t>>;
|
||||
|
||||
class IVF : public VectorIndex, protected FaissBaseIndex {
|
||||
class IVF : public VectorIndex, public FaissBaseIndex {
|
||||
public:
|
||||
IVF() : FaissBaseIndex(nullptr) {
|
||||
}
|
||||
|
||||
@ -15,7 +15,8 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/gpu/GpuCloner.h>
|
||||
#include <faiss/index_factory.h>
|
||||
#include <memory>
|
||||
|
||||
#include "knowhere/adapter/VectorAdapter.h"
|
||||
@ -56,14 +57,7 @@ IVFSQ::CopyCpuToGpu(const int64_t& device_id, const Config& config) {
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) {
|
||||
ResScope rs(res, device_id, false);
|
||||
|
||||
#ifdef CUSTOMIZATION
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
option.allInGpu = true;
|
||||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get(), &option);
|
||||
#else
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get());
|
||||
#endif
|
||||
|
||||
std::shared_ptr<faiss::Index> device_index;
|
||||
device_index.reset(gpu_index);
|
||||
|
||||
@ -17,19 +17,25 @@
|
||||
// under the License.
|
||||
|
||||
#include "knowhere/index/vector_index/IndexIVFSQHybrid.h"
|
||||
#include <utility>
|
||||
#include "faiss/AutoTune.h"
|
||||
#include "faiss/gpu/GpuAutoTune.h"
|
||||
#include "faiss/gpu/GpuIndexIVF.h"
|
||||
#include "knowhere/adapter/VectorAdapter.h"
|
||||
#include "knowhere/common/Exception.h"
|
||||
|
||||
#include <utility>
|
||||
|
||||
#include <faiss/gpu/GpuCloner.h>
|
||||
#include <faiss/gpu/GpuIndexIVF.h>
|
||||
#include <faiss/index_factory.h>
|
||||
|
||||
namespace knowhere {
|
||||
|
||||
#ifdef CUSTOMIZATION
|
||||
|
||||
// std::mutex g_mutex;
|
||||
|
||||
IndexModelPtr
|
||||
IVFSQHybrid::Train(const DatasetPtr& dataset, const Config& config) {
|
||||
// std::lock_guard<std::mutex> lk(g_mutex);
|
||||
|
||||
auto build_cfg = std::dynamic_pointer_cast<IVFSQCfg>(config);
|
||||
if (build_cfg != nullptr) {
|
||||
build_cfg->CheckValid(); // throw exception
|
||||
@ -63,23 +69,25 @@ IVFSQHybrid::Train(const DatasetPtr& dataset, const Config& config) {
|
||||
|
||||
VectorIndexPtr
|
||||
IVFSQHybrid::CopyGpuToCpu(const Config& config) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
if (auto device_idx = std::dynamic_pointer_cast<faiss::IndexIVF>(index_)) {
|
||||
faiss::Index* device_index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
|
||||
std::shared_ptr<faiss::Index> new_index;
|
||||
new_index.reset(host_index);
|
||||
return std::make_shared<IVFSQHybrid>(new_index);
|
||||
} else {
|
||||
// TODO(linxj): why? jinhai
|
||||
if (gpu_mode == 0) {
|
||||
return std::make_shared<IVFSQHybrid>(index_);
|
||||
}
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
faiss::Index* device_index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
|
||||
std::shared_ptr<faiss::Index> new_index;
|
||||
new_index.reset(host_index);
|
||||
return std::make_shared<IVFSQHybrid>(new_index);
|
||||
}
|
||||
|
||||
VectorIndexPtr
|
||||
IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) {
|
||||
if (gpu_mode != 0) {
|
||||
KNOWHERE_THROW_MSG("Not a GpuIndex Type");
|
||||
}
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) {
|
||||
ResScope rs(res, device_id, false);
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
@ -105,16 +113,26 @@ IVFSQHybrid::LoadImpl(const BinarySet& index_binary) {
|
||||
FaissBaseIndex::LoadImpl(index_binary); // load on cpu
|
||||
auto* ivf_index = dynamic_cast<faiss::IndexIVF*>(index_.get());
|
||||
ivf_index->backup_quantizer();
|
||||
gpu_mode = 0;
|
||||
}
|
||||
|
||||
void
|
||||
IVFSQHybrid::search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels,
|
||||
const Config& cfg) {
|
||||
// std::lock_guard<std::mutex> lk(g_mutex);
|
||||
// static int64_t search_count;
|
||||
// ++search_count;
|
||||
|
||||
if (gpu_mode == 2) {
|
||||
GPUIVF::search_impl(n, data, k, distances, labels, cfg);
|
||||
} else if (gpu_mode == 1) {
|
||||
ResScope rs(res_, gpu_id_);
|
||||
IVF::search_impl(n, data, k, distances, labels, cfg);
|
||||
// index_->search(n, (float*)data, k, distances, labels);
|
||||
} else if (gpu_mode == 1) { // hybrid
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(quantizer_gpu_id_)) {
|
||||
ResScope rs(res, quantizer_gpu_id_, true);
|
||||
IVF::search_impl(n, data, k, distances, labels, cfg);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("Hybrid Search Error, can't get gpu: " + std::to_string(quantizer_gpu_id_) + "resource");
|
||||
}
|
||||
} else if (gpu_mode == 0) {
|
||||
IVF::search_impl(n, data, k, distances, labels, cfg);
|
||||
}
|
||||
@ -122,16 +140,18 @@ IVFSQHybrid::search_impl(int64_t n, const float* data, int64_t k, float* distanc
|
||||
|
||||
QuantizerPtr
|
||||
IVFSQHybrid::LoadQuantizer(const Config& conf) {
|
||||
// std::lock_guard<std::mutex> lk(g_mutex);
|
||||
|
||||
auto quantizer_conf = std::dynamic_pointer_cast<QuantizerCfg>(conf);
|
||||
if (quantizer_conf != nullptr) {
|
||||
if (quantizer_conf->mode != 1) {
|
||||
KNOWHERE_THROW_MSG("mode only support 1 in this func");
|
||||
}
|
||||
}
|
||||
gpu_id_ = quantizer_conf->gpu_id;
|
||||
auto gpu_id = quantizer_conf->gpu_id;
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(res, gpu_id_, false);
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id)) {
|
||||
ResScope rs(res, gpu_id, false);
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
option.allInGpu = true;
|
||||
|
||||
@ -140,7 +160,7 @@ IVFSQHybrid::LoadQuantizer(const Config& conf) {
|
||||
index_composition->quantizer = nullptr;
|
||||
index_composition->mode = quantizer_conf->mode; // only 1
|
||||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index_composition, &option);
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id, index_composition, &option);
|
||||
delete gpu_index;
|
||||
|
||||
auto q = std::make_shared<FaissIVFQuantizer>();
|
||||
@ -148,16 +168,19 @@ IVFSQHybrid::LoadQuantizer(const Config& conf) {
|
||||
auto& q_ptr = index_composition->quantizer;
|
||||
q->size = q_ptr->d * q_ptr->getNumVecs() * sizeof(float);
|
||||
q->quantizer = q_ptr;
|
||||
q->gpu_id = gpu_id;
|
||||
res_ = res;
|
||||
gpu_mode = 1;
|
||||
return q;
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu: " + std::to_string(gpu_id_) + "resource");
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu: " + std::to_string(gpu_id) + "resource");
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
IVFSQHybrid::SetQuantizer(const QuantizerPtr& q) {
|
||||
// std::lock_guard<std::mutex> lk(g_mutex);
|
||||
|
||||
auto ivf_quantizer = std::dynamic_pointer_cast<FaissIVFQuantizer>(q);
|
||||
if (ivf_quantizer == nullptr) {
|
||||
KNOWHERE_THROW_MSG("Quantizer type error");
|
||||
@ -170,20 +193,27 @@ IVFSQHybrid::SetQuantizer(const QuantizerPtr& q) {
|
||||
// delete ivf_index->quantizer;
|
||||
ivf_index->quantizer = ivf_quantizer->quantizer;
|
||||
}
|
||||
quantizer_gpu_id_ = ivf_quantizer->gpu_id;
|
||||
gpu_mode = 1;
|
||||
}
|
||||
|
||||
void
|
||||
IVFSQHybrid::UnsetQuantizer() {
|
||||
// std::lock_guard<std::mutex> lk(g_mutex);
|
||||
|
||||
auto* ivf_index = dynamic_cast<faiss::IndexIVF*>(index_.get());
|
||||
if (ivf_index == nullptr) {
|
||||
KNOWHERE_THROW_MSG("Index type error");
|
||||
}
|
||||
|
||||
ivf_index->quantizer = nullptr;
|
||||
quantizer_gpu_id_ = -1;
|
||||
}
|
||||
|
||||
VectorIndexPtr
|
||||
IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
// std::lock_guard<std::mutex> lk(g_mutex);
|
||||
|
||||
auto quantizer_conf = std::dynamic_pointer_cast<QuantizerCfg>(conf);
|
||||
if (quantizer_conf != nullptr) {
|
||||
if (quantizer_conf->mode != 2) {
|
||||
@ -192,13 +222,11 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("conf error");
|
||||
}
|
||||
// if (quantizer_conf->gpu_id != gpu_id_) {
|
||||
// KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card");
|
||||
// }
|
||||
gpu_id_ = quantizer_conf->gpu_id;
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(res, gpu_id_, false);
|
||||
auto gpu_id = quantizer_conf->gpu_id;
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id)) {
|
||||
ResScope rs(res, gpu_id, false);
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
option.allInGpu = true;
|
||||
|
||||
@ -211,18 +239,20 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
|
||||
index_composition->quantizer = ivf_quantizer->quantizer;
|
||||
index_composition->mode = quantizer_conf->mode; // only 2
|
||||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index_composition, &option);
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id, index_composition, &option);
|
||||
std::shared_ptr<faiss::Index> new_idx;
|
||||
new_idx.reset(gpu_index);
|
||||
auto sq_idx = std::make_shared<IVFSQHybrid>(new_idx, gpu_id_, res);
|
||||
auto sq_idx = std::make_shared<IVFSQHybrid>(new_idx, gpu_id, res);
|
||||
return sq_idx;
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu: " + std::to_string(gpu_id_) + "resource");
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu: " + std::to_string(gpu_id) + "resource");
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<VectorIndexPtr, QuantizerPtr>
|
||||
IVFSQHybrid::CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& config) {
|
||||
// std::lock_guard<std::mutex> lk(g_mutex);
|
||||
|
||||
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) {
|
||||
ResScope rs(res, device_id, false);
|
||||
faiss::gpu::GpuClonerOptions option;
|
||||
@ -242,12 +272,29 @@ IVFSQHybrid::CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& c
|
||||
auto q = std::make_shared<FaissIVFQuantizer>();
|
||||
q->quantizer = index_composition.quantizer;
|
||||
q->size = index_composition.quantizer->d * index_composition.quantizer->getNumVecs() * sizeof(float);
|
||||
q->gpu_id = device_id;
|
||||
return std::make_pair(new_idx, q);
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu: " + std::to_string(gpu_id_) + "resource");
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
IVFSQHybrid::set_index_model(IndexModelPtr model) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
auto host_index = std::static_pointer_cast<IVFIndexModel>(model);
|
||||
if (auto gpures = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
|
||||
ResScope rs(gpures, gpu_id_, false);
|
||||
auto device_index = faiss::gpu::index_cpu_to_gpu(gpures->faiss_res.get(), gpu_id_, host_index->index_.get());
|
||||
index_.reset(device_index);
|
||||
res_ = gpures;
|
||||
gpu_mode = 2;
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("load index model error, can't get gpu_resource");
|
||||
}
|
||||
}
|
||||
|
||||
FaissIVFQuantizer::~FaissIVFQuantizer() {
|
||||
if (quantizer != nullptr) {
|
||||
delete quantizer;
|
||||
@ -307,5 +354,10 @@ IVFSQHybrid::LoadImpl(const BinarySet& index_binary) {
|
||||
GPUIVF::LoadImpl(index_binary);
|
||||
}
|
||||
|
||||
void
|
||||
IVFSQHybrid::set_index_model(IndexModelPtr model) {
|
||||
GPUIVF::set_index_model(model);
|
||||
}
|
||||
|
||||
#endif
|
||||
} // namespace knowhere
|
||||
|
||||
@ -17,7 +17,9 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <faiss/gpu/GpuIndexFlat.h>
|
||||
#include <faiss/index_io.h>
|
||||
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
@ -29,6 +31,7 @@ namespace knowhere {
|
||||
#ifdef CUSTOMIZATION
|
||||
struct FaissIVFQuantizer : public Quantizer {
|
||||
faiss::gpu::GpuIndexFlat* quantizer = nullptr;
|
||||
int64_t gpu_id;
|
||||
|
||||
~FaissIVFQuantizer() override;
|
||||
};
|
||||
@ -52,6 +55,9 @@ class IVFSQHybrid : public GPUIVFSQ {
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
set_index_model(IndexModelPtr model) override;
|
||||
|
||||
QuantizerPtr
|
||||
LoadQuantizer(const Config& conf);
|
||||
|
||||
@ -85,6 +91,7 @@ class IVFSQHybrid : public GPUIVFSQ {
|
||||
|
||||
protected:
|
||||
int64_t gpu_mode = 0; // 0,1,2
|
||||
int64_t quantizer_gpu_id_ = -1;
|
||||
};
|
||||
|
||||
} // namespace knowhere
|
||||
|
||||
@ -48,6 +48,7 @@ class VectorIndex : public Index {
|
||||
virtual void
|
||||
Seal() = 0;
|
||||
|
||||
// TODO(linxj): Deprecated
|
||||
virtual VectorIndexPtr
|
||||
Clone() = 0;
|
||||
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <faiss/AuxIndexStructures.h>
|
||||
#include <faiss/impl/io.h>
|
||||
|
||||
namespace knowhere {
|
||||
|
||||
|
||||
2
core/src/index/thirdparty/versions.txt
vendored
2
core/src/index/thirdparty/versions.txt
vendored
@ -3,4 +3,4 @@ BOOST_VERSION=1.70.0
|
||||
GTEST_VERSION=1.8.1
|
||||
LAPACK_VERSION=v3.8.0
|
||||
OPENBLAS_VERSION=v0.3.6
|
||||
FAISS_VERSION=branch-0.2.1
|
||||
FAISS_VERSION=branch-0.3.0
|
||||
@ -26,7 +26,7 @@
|
||||
#include "knowhere/index/vector_index/IndexIVFSQ.h"
|
||||
#include "knowhere/index/vector_index/IndexIVFSQHybrid.h"
|
||||
|
||||
constexpr int DEVICEID = 0;
|
||||
int DEVICEID = 0;
|
||||
constexpr int64_t DIM = 128;
|
||||
constexpr int64_t NB = 10000;
|
||||
constexpr int64_t NQ = 10;
|
||||
|
||||
@ -16,17 +16,23 @@
|
||||
// under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
|
||||
#include "unittest/Helper.h"
|
||||
#include "unittest/utils.h"
|
||||
|
||||
#include "knowhere/common/Timer.h"
|
||||
|
||||
class SingleIndexTest : public DataGen, public TestGpuIndexBase {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
TestGpuIndexBase::SetUp();
|
||||
Generate(DIM, NB, NQ);
|
||||
k = K;
|
||||
nb = 1000000;
|
||||
nq = 1000;
|
||||
dim = DIM;
|
||||
Generate(dim, nb, nq);
|
||||
k = 1000;
|
||||
}
|
||||
|
||||
void
|
||||
@ -119,4 +125,113 @@ TEST_F(SingleIndexTest, IVFSQHybrid) {
|
||||
}
|
||||
}
|
||||
|
||||
// TEST_F(SingleIndexTest, thread_safe) {
|
||||
// assert(!xb.empty());
|
||||
//
|
||||
// index_type = "IVFSQHybrid";
|
||||
// index_ = IndexFactory(index_type);
|
||||
// auto base = ParamGenerator::GetInstance().Gen(ParameterType::ivfsq);
|
||||
// auto conf = std::dynamic_pointer_cast<knowhere::IVFSQCfg>(base);
|
||||
// conf->nlist = 16384;
|
||||
// conf->k = k;
|
||||
// conf->nprobe = 10;
|
||||
// conf->d = dim;
|
||||
// auto preprocessor = index_->BuildPreprocessor(base_dataset, conf);
|
||||
// index_->set_preprocessor(preprocessor);
|
||||
//
|
||||
// auto model = index_->Train(base_dataset, conf);
|
||||
// index_->set_index_model(model);
|
||||
// index_->Add(base_dataset, conf);
|
||||
// EXPECT_EQ(index_->Count(), nb);
|
||||
// EXPECT_EQ(index_->Dimension(), dim);
|
||||
//
|
||||
// auto binaryset = index_->Serialize();
|
||||
//
|
||||
//
|
||||
//
|
||||
// auto cpu_idx = std::make_shared<knowhere::IVFSQHybrid>(DEVICEID);
|
||||
// cpu_idx->Load(binaryset);
|
||||
// auto pair = cpu_idx->CopyCpuToGpuWithQuantizer(DEVICEID, conf);
|
||||
// auto quantizer = pair.second;
|
||||
//
|
||||
// auto quantizer_conf = std::make_shared<knowhere::QuantizerCfg>();
|
||||
// quantizer_conf->mode = 2; // only copy data
|
||||
// quantizer_conf->gpu_id = DEVICEID;
|
||||
//
|
||||
// auto CopyAllToGpu = [&](int64_t search_count, bool do_search = false) {
|
||||
// for (int i = 0; i < search_count; ++i) {
|
||||
// auto gpu_idx = cpu_idx->CopyCpuToGpu(DEVICEID, conf);
|
||||
// if (do_search) {
|
||||
// auto result = gpu_idx->Search(query_dataset, conf);
|
||||
// AssertAnns(result, nq, conf->k);
|
||||
// }
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// auto hybrid_qt_idx = std::make_shared<knowhere::IVFSQHybrid>(DEVICEID);
|
||||
// hybrid_qt_idx->Load(binaryset);
|
||||
// auto SetQuantizerDoSearch = [&](int64_t search_count) {
|
||||
// for (int i = 0; i < search_count; ++i) {
|
||||
// hybrid_qt_idx->SetQuantizer(quantizer);
|
||||
// auto result = hybrid_qt_idx->Search(query_dataset, conf);
|
||||
// AssertAnns(result, nq, conf->k);
|
||||
// // PrintResult(result, nq, k);
|
||||
// hybrid_qt_idx->UnsetQuantizer();
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// auto hybrid_data_idx = std::make_shared<knowhere::IVFSQHybrid>(DEVICEID);
|
||||
// hybrid_data_idx->Load(binaryset);
|
||||
// auto LoadDataDoSearch = [&](int64_t search_count, bool do_search = false) {
|
||||
// for (int i = 0; i < search_count; ++i) {
|
||||
// auto hybrid_idx = hybrid_data_idx->LoadData(quantizer, quantizer_conf);
|
||||
// if (do_search) {
|
||||
// auto result = hybrid_idx->Search(query_dataset, conf);
|
||||
//// AssertAnns(result, nq, conf->k);
|
||||
// }
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// knowhere::TimeRecorder tc("");
|
||||
// CopyAllToGpu(2000/2, false);
|
||||
// tc.RecordSection("CopyAllToGpu witout search");
|
||||
// CopyAllToGpu(400/2, true);
|
||||
// tc.RecordSection("CopyAllToGpu with search");
|
||||
// SetQuantizerDoSearch(6);
|
||||
// tc.RecordSection("SetQuantizer with search");
|
||||
// LoadDataDoSearch(2000/2, false);
|
||||
// tc.RecordSection("LoadData without search");
|
||||
// LoadDataDoSearch(400/2, true);
|
||||
// tc.RecordSection("LoadData with search");
|
||||
//
|
||||
// {
|
||||
// std::thread t1(CopyAllToGpu, 2000, false);
|
||||
// std::thread t2(CopyAllToGpu, 400, true);
|
||||
// t1.join();
|
||||
// t2.join();
|
||||
// }
|
||||
//
|
||||
// {
|
||||
// std::thread t1(SetQuantizerDoSearch, 12);
|
||||
// std::thread t2(CopyAllToGpu, 400, true);
|
||||
// t1.join();
|
||||
// t2.join();
|
||||
// }
|
||||
//
|
||||
// {
|
||||
// std::thread t1(SetQuantizerDoSearch, 12);
|
||||
// std::thread t2(LoadDataDoSearch, 400, true);
|
||||
// t1.join();
|
||||
// t2.join();
|
||||
// }
|
||||
//
|
||||
// {
|
||||
// std::thread t1(LoadDataDoSearch, 2000, false);
|
||||
// std::thread t2(LoadDataDoSearch, 400, true);
|
||||
// t1.join();
|
||||
// t2.join();
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
||||
#endif
|
||||
|
||||
@ -20,19 +20,12 @@
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
#include <faiss/AutoTune.h>
|
||||
#include <faiss/gpu/GpuAutoTune.h>
|
||||
#include <faiss/gpu/GpuIndexIVFFlat.h>
|
||||
|
||||
#include "knowhere/common/Exception.h"
|
||||
#include "knowhere/common/Timer.h"
|
||||
#include "knowhere/index/vector_index/IndexGPUIVF.h"
|
||||
#include "knowhere/index/vector_index/IndexGPUIVFPQ.h"
|
||||
#include "knowhere/index/vector_index/IndexGPUIVFSQ.h"
|
||||
#include "knowhere/index/vector_index/IndexIVF.h"
|
||||
#include "knowhere/index/vector_index/IndexIVFPQ.h"
|
||||
#include "knowhere/index/vector_index/IndexIVFSQ.h"
|
||||
#include "knowhere/index/vector_index/IndexIVFSQHybrid.h"
|
||||
#include "knowhere/index/vector_index/helpers/Cloner.h"
|
||||
|
||||
#include "unittest/Helper.h"
|
||||
@ -51,6 +44,9 @@ class IVFTest : public DataGen, public TestWithParam<::std::tuple<std::string, P
|
||||
ParameterType parameter_type;
|
||||
std::tie(index_type, parameter_type) = GetParam();
|
||||
// Init_with_default();
|
||||
// nb = 1000000;
|
||||
// nq = 1000;
|
||||
// k = 1000;
|
||||
Generate(DIM, NB, NQ);
|
||||
index_ = IndexFactory(index_type);
|
||||
conf = ParamGenerator::GetInstance().Gen(parameter_type);
|
||||
@ -61,16 +57,6 @@ class IVFTest : public DataGen, public TestWithParam<::std::tuple<std::string, P
|
||||
knowhere::FaissGpuResourceMgr::GetInstance().Free();
|
||||
}
|
||||
|
||||
knowhere::VectorIndexPtr
|
||||
ChooseTodo() {
|
||||
std::vector<std::string> gpu_idx{"GPUIVFSQ"};
|
||||
auto finder = std::find(gpu_idx.cbegin(), gpu_idx.cend(), index_type);
|
||||
if (finder != gpu_idx.cend()) {
|
||||
return knowhere::cloner::CopyCpuToGpu(index_, DEVICEID, knowhere::Config());
|
||||
}
|
||||
return index_;
|
||||
}
|
||||
|
||||
protected:
|
||||
std::string index_type;
|
||||
knowhere::Config conf;
|
||||
@ -100,8 +86,7 @@ TEST_P(IVFTest, ivf_basic) {
|
||||
EXPECT_EQ(index_->Count(), nb);
|
||||
EXPECT_EQ(index_->Dimension(), dim);
|
||||
|
||||
auto new_idx = ChooseTodo();
|
||||
auto result = new_idx->Search(query_dataset, conf);
|
||||
auto result = index_->Search(query_dataset, conf);
|
||||
AssertAnns(result, nq, conf->k);
|
||||
// PrintResult(result, nq, k);
|
||||
}
|
||||
@ -134,8 +119,7 @@ TEST_P(IVFTest, ivf_serialize) {
|
||||
|
||||
index_->set_index_model(model);
|
||||
index_->Add(base_dataset, conf);
|
||||
auto new_idx = ChooseTodo();
|
||||
auto result = new_idx->Search(query_dataset, conf);
|
||||
auto result = index_->Search(query_dataset, conf);
|
||||
AssertAnns(result, nq, conf->k);
|
||||
}
|
||||
|
||||
@ -159,8 +143,7 @@ TEST_P(IVFTest, ivf_serialize) {
|
||||
index_->Load(binaryset);
|
||||
EXPECT_EQ(index_->Count(), nb);
|
||||
EXPECT_EQ(index_->Dimension(), dim);
|
||||
auto new_idx = ChooseTodo();
|
||||
auto result = new_idx->Search(query_dataset, conf);
|
||||
auto result = index_->Search(query_dataset, conf);
|
||||
AssertAnns(result, nq, conf->k);
|
||||
}
|
||||
}
|
||||
@ -176,8 +159,7 @@ TEST_P(IVFTest, clone_test) {
|
||||
index_->Add(base_dataset, conf);
|
||||
EXPECT_EQ(index_->Count(), nb);
|
||||
EXPECT_EQ(index_->Dimension(), dim);
|
||||
auto new_idx = ChooseTodo();
|
||||
auto result = new_idx->Search(query_dataset, conf);
|
||||
auto result = index_->Search(query_dataset, conf);
|
||||
AssertAnns(result, nq, conf->k);
|
||||
// PrintResult(result, nq, k);
|
||||
|
||||
@ -210,12 +192,6 @@ TEST_P(IVFTest, clone_test) {
|
||||
// }
|
||||
// }
|
||||
|
||||
{
|
||||
if (index_type == "IVFSQHybrid") {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// copy from gpu to cpu
|
||||
std::vector<std::string> support_idx_vec{"GPUIVF", "GPUIVFSQ", "IVFSQHybrid"};
|
||||
@ -277,8 +253,7 @@ TEST_P(IVFTest, gpu_seal_test) {
|
||||
index_->Add(base_dataset, conf);
|
||||
EXPECT_EQ(index_->Count(), nb);
|
||||
EXPECT_EQ(index_->Dimension(), dim);
|
||||
auto new_idx = ChooseTodo();
|
||||
auto result = new_idx->Search(query_dataset, conf);
|
||||
auto result = index_->Search(query_dataset, conf);
|
||||
AssertAnns(result, nq, conf->k);
|
||||
|
||||
auto cpu_idx = knowhere::cloner::CopyGpuToCpu(index_, knowhere::Config());
|
||||
|
||||
@ -91,7 +91,7 @@ JobMgr::worker_function() {
|
||||
// disk resources NEVER be empty.
|
||||
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
|
||||
for (auto& task : tasks) {
|
||||
disk->task_table().Put(task);
|
||||
disk->task_table().Put(task, nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,6 +94,7 @@ class OptimizerInst {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (instance == nullptr) {
|
||||
std::vector<PassPtr> pass_list;
|
||||
pass_list.push_back(std::make_shared<LargeSQ8HPass>());
|
||||
pass_list.push_back(std::make_shared<HybridPass>());
|
||||
instance = std::make_shared<Optimizer>(pass_list);
|
||||
}
|
||||
|
||||
@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
|
||||
if (resource->HasExecutor() == false) {
|
||||
load_completed_event->task_table_item_->Move();
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource);
|
||||
break;
|
||||
}
|
||||
default: { break; }
|
||||
|
||||
@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) {
|
||||
}
|
||||
|
||||
void
|
||||
TaskTable::Put(TaskPtr task) {
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
TaskTable::Put(TaskPtr task, TaskTableItemPtr from) {
|
||||
auto item = std::make_shared<TaskTableItem>(std::move(from));
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) {
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
TaskTable::Put(std::vector<TaskPtr>& tasks) {
|
||||
for (auto& task : tasks) {
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
item->timestamp.start = get_current_timestamp();
|
||||
table_.put(std::move(item));
|
||||
}
|
||||
if (subscriber_) {
|
||||
subscriber_();
|
||||
}
|
||||
}
|
||||
|
||||
size_t
|
||||
TaskTable::TaskToExecute() {
|
||||
size_t count = 0;
|
||||
|
||||
@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable {
|
||||
Dump() const override;
|
||||
};
|
||||
|
||||
struct TaskTableItem;
|
||||
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
|
||||
|
||||
struct TaskTableItem : public interface::dumpable {
|
||||
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
|
||||
explicit TaskTableItem(TaskTableItemPtr f = nullptr)
|
||||
: id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex(), from(std::move(f)) {
|
||||
}
|
||||
|
||||
TaskTableItem(const TaskTableItem& src) = delete;
|
||||
@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable {
|
||||
TaskTableItemState state; // the state;
|
||||
std::mutex mutex;
|
||||
TaskTimestamp timestamp;
|
||||
TaskTableItemPtr from;
|
||||
|
||||
bool
|
||||
IsFinish();
|
||||
@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable {
|
||||
Dump() const override;
|
||||
};
|
||||
|
||||
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
|
||||
|
||||
class TaskTable : public interface::dumpable {
|
||||
public:
|
||||
TaskTable() : table_(1ULL << 16ULL) {
|
||||
@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable {
|
||||
* Put one task;
|
||||
*/
|
||||
void
|
||||
Put(TaskPtr task);
|
||||
|
||||
/*
|
||||
* Put tasks back of task table;
|
||||
* Called by DBImpl;
|
||||
*/
|
||||
void
|
||||
Put(std::vector<TaskPtr>& tasks);
|
||||
Put(TaskPtr task, TaskTableItemPtr from = nullptr);
|
||||
|
||||
size_t
|
||||
TaskToExecute();
|
||||
|
||||
@ -28,13 +28,13 @@ namespace scheduler {
|
||||
class Action {
|
||||
public:
|
||||
static void
|
||||
PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self);
|
||||
PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self);
|
||||
|
||||
static void
|
||||
PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self);
|
||||
PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self);
|
||||
|
||||
static void
|
||||
PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest);
|
||||
PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest);
|
||||
|
||||
static void
|
||||
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
|
||||
@ -59,7 +59,7 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
|
||||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
|
||||
Action::PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self) {
|
||||
auto neighbours = get_neighbours_with_connetion(self);
|
||||
if (not neighbours.empty()) {
|
||||
std::vector<uint64_t> speeds;
|
||||
@ -78,7 +78,7 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
|
||||
for (uint64_t i = 0; i < speeds.size(); ++i) {
|
||||
rd_speed -= speeds[i];
|
||||
if (rd_speed <= 0) {
|
||||
neighbours[i].first->task_table().Put(task);
|
||||
neighbours[i].first->task_table().Put(task_item->task, task_item);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -89,22 +89,23 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
|
||||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
|
||||
Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) {
|
||||
auto neighbours = get_neighbours(self);
|
||||
for (auto& neighbour : neighbours) {
|
||||
neighbour->task_table().Put(task);
|
||||
neighbour->task_table().Put(task_item->task, task_item);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
|
||||
dest->task_table().Put(task);
|
||||
Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest) {
|
||||
dest->task_table().Put(task_item->task, task_item);
|
||||
}
|
||||
|
||||
void
|
||||
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
|
||||
auto task_item = event->task_table_item_;
|
||||
auto task = event->task_table_item_->task;
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
bool moved = false;
|
||||
@ -119,7 +120,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
|
||||
if (index != nullptr) {
|
||||
moved = true;
|
||||
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
|
||||
PushTaskToResource(event->task_table_item_->task, dest_resource);
|
||||
PushTaskToResource(event->task_table_item_, dest_resource);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -127,7 +128,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
|
||||
}
|
||||
|
||||
if (not moved) {
|
||||
PushTaskToNeighbourRandomly(task, resource);
|
||||
PushTaskToNeighbourRandomly(task_item, resource);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -135,6 +136,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
|
||||
void
|
||||
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
|
||||
std::shared_ptr<LoadCompletedEvent> event) {
|
||||
auto task_item = event->task_table_item_;
|
||||
auto task = event->task_table_item_->task;
|
||||
if (resource->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
@ -213,7 +215,7 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
|
||||
// next_res->task_table().Put(task);
|
||||
// }
|
||||
event->task_table_item_->Move();
|
||||
next_res->task_table().Put(task);
|
||||
next_res->task_table().Put(task, task_item);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -26,48 +26,48 @@
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
// bool
|
||||
// LargeSQ8HPass::Run(const TaskPtr& task) {
|
||||
// if (task->Type() != TaskType::SearchTask) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
// if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
|
||||
//
|
||||
// // TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
|
||||
// if (search_job->nq() < 100) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
|
||||
// std::vector<int64_t> all_free_mem;
|
||||
// for (auto& gpu : gpus) {
|
||||
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
// auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
|
||||
// all_free_mem.push_back(free_mem);
|
||||
// }
|
||||
//
|
||||
// auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
|
||||
// auto best_index = std::distance(all_free_mem.begin(), max_e);
|
||||
// auto best_device_id = gpus[best_index];
|
||||
//
|
||||
// ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
|
||||
// if (not res_ptr) {
|
||||
// SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
|
||||
// // TODO: throw critical error and exit
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
// task->label() = label;
|
||||
//
|
||||
// return true;
|
||||
// }
|
||||
bool
|
||||
LargeSQ8HPass::Run(const TaskPtr& task) {
|
||||
if (task->Type() != TaskType::SearchTask) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto search_job = std::static_pointer_cast<SearchJob>(search_task->job_.lock());
|
||||
|
||||
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
|
||||
if (search_job->nq() < 100) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
|
||||
std::vector<int64_t> all_free_mem;
|
||||
for (auto& gpu : gpus) {
|
||||
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
|
||||
all_free_mem.push_back(free_mem);
|
||||
}
|
||||
|
||||
auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
|
||||
auto best_index = std::distance(all_free_mem.begin(), max_e);
|
||||
auto best_device_id = gpus[best_index];
|
||||
|
||||
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id);
|
||||
if (not res_ptr) {
|
||||
SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid.";
|
||||
// TODO: throw critical error and exit
|
||||
return false;
|
||||
}
|
||||
|
||||
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
task->label() = label;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
||||
@ -37,8 +37,8 @@ class LargeSQ8HPass : public Pass {
|
||||
LargeSQ8HPass() = default;
|
||||
|
||||
public:
|
||||
// bool
|
||||
// Run(const TaskPtr& task) override;
|
||||
bool
|
||||
Run(const TaskPtr& task) override;
|
||||
};
|
||||
|
||||
using LargeSQ8HPassPtr = std::shared_ptr<LargeSQ8HPass>;
|
||||
|
||||
@ -180,6 +180,10 @@ Resource::loader_function() {
|
||||
}
|
||||
LoadFile(task_item->task);
|
||||
task_item->Loaded();
|
||||
if (task_item->from) {
|
||||
task_item->from->Moved();
|
||||
task_item->from = nullptr;
|
||||
}
|
||||
if (subscriber_) {
|
||||
auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
|
||||
subscriber_(std::static_pointer_cast<Event>(event));
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <faiss/utils.h>
|
||||
#include <faiss/utils/distances.h>
|
||||
#include <omp.h>
|
||||
#include <cmath>
|
||||
#include <string>
|
||||
|
||||
@ -71,6 +71,7 @@ class VecIndex : public cache::DataObj {
|
||||
virtual VecIndexPtr
|
||||
CopyToCpu(const Config& cfg = Config()) = 0;
|
||||
|
||||
// TODO(linxj): Deprecated
|
||||
virtual VecIndexPtr
|
||||
Clone() = 0;
|
||||
|
||||
|
||||
@ -108,7 +108,6 @@ TEST_F(EngineTest, ENGINE_IMPL_TEST) {
|
||||
ASSERT_EQ(engine_ptr->Dimension(), dimension);
|
||||
ASSERT_EQ(engine_ptr->Count(), ids.size());
|
||||
|
||||
status = engine_ptr->CopyToGpu(0, true);
|
||||
status = engine_ptr->CopyToGpu(0, false);
|
||||
//ASSERT_TRUE(status.ok());
|
||||
|
||||
|
||||
@ -65,7 +65,7 @@ static const char
|
||||
" cache_insert_data: false # whether load inserted data into cache\n"
|
||||
"\n"
|
||||
"engine_config:\n"
|
||||
" blas_threshold: 20\n"
|
||||
" use_blas_threshold: 20\n"
|
||||
"\n"
|
||||
"resource_config:\n"
|
||||
" search_resources:\n"
|
||||
|
||||
@ -193,16 +193,13 @@ TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_BATCH) {
|
||||
std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
|
||||
empty_table_.Put(tasks);
|
||||
for (auto& task : tasks) {
|
||||
empty_table_.Put(task);
|
||||
}
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(1)->task, task2_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
|
||||
std::vector<milvus::scheduler::TaskPtr> tasks{};
|
||||
empty_table_.Put(tasks);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, SIZE) {
|
||||
ASSERT_EQ(empty_table_.size(), 0);
|
||||
empty_table_.Put(task1_);
|
||||
|
||||
@ -74,7 +74,7 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest,
|
||||
10,
|
||||
10),
|
||||
std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_CPU, "Default", DIM, NB, 10, 10),
|
||||
// std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_GPU, "Default", DIM, NB, 10, 10),
|
||||
std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_GPU, "Default", DIM, NB, 10, 10),
|
||||
std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_MIX, "Default", DIM, NB, 10, 10),
|
||||
// std::make_tuple(IndexType::NSG_MIX, "Default", 128, 250000, 10, 10),
|
||||
// std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default", 128, 250000, 10, 10),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user