refactor(db): impl template for engine type

Former-commit-id: 2e9a6fa992f58dfe567f7c274b9327454cf3f89e
This commit is contained in:
Xu Peng 2019-04-30 21:02:13 +08:00
parent 0e37089c6a
commit 166289cafa
4 changed files with 91 additions and 60 deletions

View File

@ -16,37 +16,43 @@ namespace zilliz {
namespace vecwise {
namespace engine {
DBImpl::DBImpl(const Options& options)
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
: _env(options.env),
_options(options),
_bg_compaction_scheduled(false),
_shutting_down(false),
bg_build_index_started_(false),
_pMeta(new meta::DBMetaImpl(_options.meta)),
_pMemMgr(new MemManager(_pMeta, _options)) {
_pMemMgr(new MemManager<EngineT>(_pMeta, _options)) {
start_timer_task(_options.memory_sync_interval);
}
Status DBImpl::add_group(meta::GroupSchema& group_info) {
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::GroupSchema& group_info) {
return _pMeta->add_group(group_info);
}
Status DBImpl::get_group(meta::GroupSchema& group_info) {
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
return _pMeta->get_group(group_info);
}
Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) {
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
return _pMeta->has_group(group_id_, has_or_not_);
}
Status DBImpl::get_group_files(const std::string& group_id,
template<typename EngineT>
Status DBImpl<EngineT>::get_group_files(const std::string& group_id,
const int date_delta,
meta::GroupFilesSchema& group_files_info) {
return _pMeta->get_group_files(group_id, date_delta, group_files_info);
}
Status DBImpl::add_vectors(const std::string& group_id_,
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
if (!status.ok()) {
@ -54,13 +60,15 @@ Status DBImpl::add_vectors(const std::string& group_id_,
}
}
Status DBImpl::search(const std::string &group_id, size_t k, size_t nq,
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string &group_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
meta::DatesT dates = {meta::Meta::GetDate()};
return search(group_id, k, nq, vectors, dates, results);
}
Status DBImpl::search(const std::string& group_id, size_t k, size_t nq,
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedGroupFilesSchema files;
@ -158,12 +166,14 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq,
return Status::OK();
}
void DBImpl::start_timer_task(int interval_) {
std::thread bg_task(&DBImpl::background_timer_task, this, interval_);
template<typename EngineT>
void DBImpl<EngineT>::start_timer_task(int interval_) {
std::thread bg_task(&DBImpl<EngineT>::background_timer_task, this, interval_);
bg_task.detach();
}
void DBImpl::background_timer_task(int interval_) {
template<typename EngineT>
void DBImpl<EngineT>::background_timer_task(int interval_) {
Status status;
while (true) {
if (!_bg_error.ok()) break;
@ -175,19 +185,22 @@ void DBImpl::background_timer_task(int interval_) {
}
}
void DBImpl::try_schedule_compaction() {
template<typename EngineT>
void DBImpl<EngineT>::try_schedule_compaction() {
if (_bg_compaction_scheduled) return;
if (!_bg_error.ok()) return;
_bg_compaction_scheduled = true;
_env->schedule(&DBImpl::BGWork, this);
_env->schedule(&DBImpl<EngineT>::BGWork, this);
}
void DBImpl::BGWork(void* db_) {
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
reinterpret_cast<DBImpl*>(db_)->background_call();
}
void DBImpl::background_call() {
template<typename EngineT>
void DBImpl<EngineT>::background_call() {
std::lock_guard<std::mutex> lock(_mutex);
assert(_bg_compaction_scheduled);
@ -201,7 +214,8 @@ void DBImpl::background_call() {
}
Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
template<typename EngineT>
Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::DateT& date,
const meta::GroupFilesSchema& files) {
meta::GroupFileSchema group_file;
group_file.group_id = group_id;
@ -248,7 +262,8 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
return status;
}
Status DBImpl::background_merge_files(const std::string& group_id) {
template<typename EngineT>
Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
meta::DatePartionedGroupFilesSchema raw_files;
auto status = _pMeta->files_to_merge(group_id, raw_files);
if (!status.ok()) {
@ -277,7 +292,8 @@ Status DBImpl::background_merge_files(const std::string& group_id) {
return Status::OK();
}
Status DBImpl::build_index(const meta::GroupFileSchema& file) {
template<typename EngineT>
Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
meta::GroupFileSchema group_file;
group_file.group_id = file.group_id;
group_file.date = file.date;
@ -305,7 +321,8 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) {
return Status::OK();
}
void DBImpl::background_build_index() {
template<typename EngineT>
void DBImpl<EngineT>::background_build_index() {
std::lock_guard<std::mutex> lock(build_index_mutex_);
assert(bg_build_index_started_);
meta::GroupFilesSchema to_index_files;
@ -325,16 +342,18 @@ void DBImpl::background_build_index() {
bg_build_index_finish_signal_.notify_all();
}
Status DBImpl::try_build_index() {
template<typename EngineT>
Status DBImpl<EngineT>::try_build_index() {
if (bg_build_index_started_) return Status::OK();
if (_shutting_down.load(std::memory_order_acquire)) return Status::OK();
bg_build_index_started_ = true;
std::thread build_index_task(&DBImpl::background_build_index, this);
std::thread build_index_task(&DBImpl<EngineT>::background_build_index, this);
build_index_task.detach();
return Status::OK();
}
void DBImpl::background_compaction() {
template<typename EngineT>
void DBImpl<EngineT>::background_compaction() {
std::vector<std::string> group_ids;
_pMemMgr->serialize(group_ids);
@ -348,15 +367,18 @@ void DBImpl::background_compaction() {
}
}
Status DBImpl::drop_all() {
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
return _pMeta->drop_all();
}
Status DBImpl::count(const std::string& group_id, long& result) {
template<typename EngineT>
Status DBImpl<EngineT>::count(const std::string& group_id, long& result) {
return _pMeta->count(group_id, result);
}
DBImpl::~DBImpl() {
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
{
std::unique_lock<std::mutex> lock(_mutex);
_shutting_down.store(true, std::memory_order_release);
@ -382,7 +404,7 @@ DB::~DB() {}
void DB::Open(const Options& options, DB** dbptr) {
*dbptr = nullptr;
*dbptr = new DBImpl(options);
*dbptr = new DBImpl<FaissExecutionEngine>(options);
return;
}

View File

@ -18,6 +18,7 @@ namespace meta {
class Meta;
}
template <typename EngineT>
class DBImpl : public DB {
public:
DBImpl(const Options& options);
@ -46,6 +47,8 @@ public:
virtual ~DBImpl();
private:
typedef MemManager<EngineT> MemManagerT;
void background_build_index();
Status build_index(const meta::GroupFileSchema&);
Status try_build_index();
@ -76,7 +79,7 @@ private:
std::condition_variable bg_build_index_finish_signal_;
std::shared_ptr<meta::Meta> _pMeta;
std::shared_ptr<MemManager> _pMemMgr;
std::shared_ptr<MemManagerT> _pMemMgr;
}; // DBImpl

View File

@ -1,3 +1,6 @@
#ifndef MEMMANGE_CPP__
#define MEMMANGE_CPP__
#include <iostream>
#include <sstream>
#include <thread>
@ -5,23 +8,24 @@
#include "MemManager.h"
#include "Meta.h"
#include "FaissExecutionEngine.h"
namespace zilliz {
namespace vecwise {
namespace engine {
MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
template<typename EngineT>
MemVectors<EngineT>::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
const meta::GroupFileSchema& schema, const Options& options)
: pMeta_(meta_ptr),
options_(options),
schema_(schema),
_pIdGenerator(new SimpleIDGenerator()),
pEE_(new FaissExecutionEngine(schema_.dimension, schema_.location)) {
pEE_(new EngineT(schema_.dimension, schema_.location)) {
}
void MemVectors::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
template<typename EngineT>
void MemVectors<EngineT>::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
_pIdGenerator->getNextIDNumbers(n_, vector_ids_);
pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
for(auto i=0 ; i<n_; i++) {
@ -29,15 +33,18 @@ void MemVectors::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
}
}
size_t MemVectors::total() const {
template<typename EngineT>
size_t MemVectors<EngineT>::total() const {
return pEE_->Count();
}
size_t MemVectors::approximate_size() const {
template<typename EngineT>
size_t MemVectors<EngineT>::approximate_size() const {
return pEE_->Size();
}
Status MemVectors::serialize(std::string& group_id) {
template<typename EngineT>
Status MemVectors<EngineT>::serialize(std::string& group_id) {
group_id = schema_.group_id;
auto rows = approximate_size();
pEE_->Serialize();
@ -52,7 +59,8 @@ Status MemVectors::serialize(std::string& group_id) {
return status;
}
MemVectors::~MemVectors() {
template<typename EngineT>
MemVectors<EngineT>::~MemVectors() {
if (_pIdGenerator != nullptr) {
delete _pIdGenerator;
_pIdGenerator = nullptr;
@ -63,7 +71,9 @@ MemVectors::~MemVectors() {
* MemManager
*/
VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) {
template<typename EngineT>
typename MemManager<EngineT>::VectorsPtr MemManager<EngineT>::get_mem_by_group(
const std::string& group_id) {
auto memIt = _memMap.find(group_id);
if (memIt != _memMap.end()) {
return memIt->second;
@ -76,11 +86,12 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) {
return nullptr;
}
_memMap[group_id] = std::shared_ptr<MemVectors>(new MemVectors(_pMeta, group_file, options_));
_memMap[group_id] = VectorsPtr(new MemVectors<EngineT>(_pMeta, group_file, options_));
return _memMap[group_id];
}
Status MemManager::add_vectors(const std::string& group_id_,
template<typename EngineT>
Status MemManager<EngineT>::add_vectors(const std::string& group_id_,
size_t n_,
const float* vectors_,
IDNumbers& vector_ids_) {
@ -88,11 +99,12 @@ Status MemManager::add_vectors(const std::string& group_id_,
return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_);
}
Status MemManager::add_vectors_no_lock(const std::string& group_id,
template<typename EngineT>
Status MemManager<EngineT>::add_vectors_no_lock(const std::string& group_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
std::shared_ptr<MemVectors> mem = get_mem_by_group(group_id);
VectorsPtr mem = get_mem_by_group(group_id);
if (mem == nullptr) {
return Status::NotFound("Group " + group_id + " not found!");
}
@ -101,7 +113,8 @@ Status MemManager::add_vectors_no_lock(const std::string& group_id,
return Status::OK();
}
Status MemManager::mark_memory_as_immutable() {
template<typename EngineT>
Status MemManager<EngineT>::mark_memory_as_immutable() {
std::unique_lock<std::mutex> lock(_mutex);
for (auto& kv: _memMap) {
_immMems.push_back(kv.second);
@ -111,20 +124,8 @@ Status MemManager::mark_memory_as_immutable() {
return Status::OK();
}
/* bool MemManager::need_serialize(double interval) { */
/* if (_immMems.size() > 0) { */
/* return false; */
/* } */
/* auto diff = std::difftime(std::time(nullptr), _last_compact_time); */
/* if (diff >= interval) { */
/* return true; */
/* } */
/* return false; */
/* } */
Status MemManager::serialize(std::vector<std::string>& group_ids) {
template<typename EngineT>
Status MemManager<EngineT>::serialize(std::vector<std::string>& group_ids) {
mark_memory_as_immutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string group_id;
@ -141,3 +142,5 @@ Status MemManager::serialize(std::vector<std::string>& group_ids) {
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif

View File

@ -19,8 +19,7 @@ namespace meta {
class Meta;
}
class FaissExecutionEngine;
template <typename EngineT>
class MemVectors {
public:
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
@ -47,15 +46,18 @@ private:
Options options_;
meta::GroupFileSchema schema_;
IDGenerator* _pIdGenerator;
std::shared_ptr<FaissExecutionEngine> pEE_;
std::shared_ptr<EngineT> pEE_;
}; // MemVectors
typedef std::shared_ptr<MemVectors> VectorsPtr;
template<typename EngineT>
class MemManager {
public:
typedef MemVectors<EngineT> ItemT;
typedef std::shared_ptr<ItemT> VectorsPtr;
MemManager(const std::shared_ptr<meta::Meta>& meta_, const Options& options)
: _pMeta(meta_), options_(options) {}
@ -85,5 +87,6 @@ private:
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#include "MemManager.cpp"
#endif