From 166289cafa57aa498dbf371f7548294fc6822a3d Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Tue, 30 Apr 2019 21:02:13 +0800 Subject: [PATCH] refactor(db): impl template for engine type Former-commit-id: 2e9a6fa992f58dfe567f7c274b9327454cf3f89e --- cpp/src/db/DBImpl.cpp | 76 +++++++++++++++++++++++++-------------- cpp/src/db/DBImpl.h | 5 ++- cpp/src/db/MemManager.cpp | 59 +++++++++++++++--------------- cpp/src/db/MemManager.h | 11 +++--- 4 files changed, 91 insertions(+), 60 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 646d6e79f2..9344832db1 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -16,37 +16,43 @@ namespace zilliz { namespace vecwise { namespace engine { -DBImpl::DBImpl(const Options& options) +template +DBImpl::DBImpl(const Options& options) : _env(options.env), _options(options), _bg_compaction_scheduled(false), _shutting_down(false), bg_build_index_started_(false), _pMeta(new meta::DBMetaImpl(_options.meta)), - _pMemMgr(new MemManager(_pMeta, _options)) { + _pMemMgr(new MemManager(_pMeta, _options)) { start_timer_task(_options.memory_sync_interval); } -Status DBImpl::add_group(meta::GroupSchema& group_info) { +template +Status DBImpl::add_group(meta::GroupSchema& group_info) { return _pMeta->add_group(group_info); } -Status DBImpl::get_group(meta::GroupSchema& group_info) { +template +Status DBImpl::get_group(meta::GroupSchema& group_info) { return _pMeta->get_group(group_info); } -Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) { +template +Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) { return _pMeta->has_group(group_id_, has_or_not_); } -Status DBImpl::get_group_files(const std::string& group_id, +template +Status DBImpl::get_group_files(const std::string& group_id, const int date_delta, meta::GroupFilesSchema& group_files_info) { return _pMeta->get_group_files(group_id, date_delta, group_files_info); } -Status DBImpl::add_vectors(const std::string& group_id_, +template +Status DBImpl::add_vectors(const std::string& group_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) { Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_); if (!status.ok()) { @@ -54,13 +60,15 @@ Status DBImpl::add_vectors(const std::string& group_id_, } } -Status DBImpl::search(const std::string &group_id, size_t k, size_t nq, +template +Status DBImpl::search(const std::string &group_id, size_t k, size_t nq, const float *vectors, QueryResults &results) { meta::DatesT dates = {meta::Meta::GetDate()}; return search(group_id, k, nq, vectors, dates, results); } -Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, +template +Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { meta::DatePartionedGroupFilesSchema files; @@ -158,12 +166,14 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, return Status::OK(); } -void DBImpl::start_timer_task(int interval_) { - std::thread bg_task(&DBImpl::background_timer_task, this, interval_); +template +void DBImpl::start_timer_task(int interval_) { + std::thread bg_task(&DBImpl::background_timer_task, this, interval_); bg_task.detach(); } -void DBImpl::background_timer_task(int interval_) { +template +void DBImpl::background_timer_task(int interval_) { Status status; while (true) { if (!_bg_error.ok()) break; @@ -175,19 +185,22 @@ void DBImpl::background_timer_task(int interval_) { } } -void DBImpl::try_schedule_compaction() { +template +void DBImpl::try_schedule_compaction() { if (_bg_compaction_scheduled) return; if (!_bg_error.ok()) return; _bg_compaction_scheduled = true; - _env->schedule(&DBImpl::BGWork, this); + _env->schedule(&DBImpl::BGWork, this); } -void DBImpl::BGWork(void* db_) { +template +void DBImpl::BGWork(void* db_) { reinterpret_cast(db_)->background_call(); } -void DBImpl::background_call() { +template +void DBImpl::background_call() { std::lock_guard lock(_mutex); assert(_bg_compaction_scheduled); @@ -201,7 +214,8 @@ void DBImpl::background_call() { } -Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, +template +Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, const meta::GroupFilesSchema& files) { meta::GroupFileSchema group_file; group_file.group_id = group_id; @@ -248,7 +262,8 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, return status; } -Status DBImpl::background_merge_files(const std::string& group_id) { +template +Status DBImpl::background_merge_files(const std::string& group_id) { meta::DatePartionedGroupFilesSchema raw_files; auto status = _pMeta->files_to_merge(group_id, raw_files); if (!status.ok()) { @@ -277,7 +292,8 @@ Status DBImpl::background_merge_files(const std::string& group_id) { return Status::OK(); } -Status DBImpl::build_index(const meta::GroupFileSchema& file) { +template +Status DBImpl::build_index(const meta::GroupFileSchema& file) { meta::GroupFileSchema group_file; group_file.group_id = file.group_id; group_file.date = file.date; @@ -305,7 +321,8 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { return Status::OK(); } -void DBImpl::background_build_index() { +template +void DBImpl::background_build_index() { std::lock_guard lock(build_index_mutex_); assert(bg_build_index_started_); meta::GroupFilesSchema to_index_files; @@ -325,16 +342,18 @@ void DBImpl::background_build_index() { bg_build_index_finish_signal_.notify_all(); } -Status DBImpl::try_build_index() { +template +Status DBImpl::try_build_index() { if (bg_build_index_started_) return Status::OK(); if (_shutting_down.load(std::memory_order_acquire)) return Status::OK(); bg_build_index_started_ = true; - std::thread build_index_task(&DBImpl::background_build_index, this); + std::thread build_index_task(&DBImpl::background_build_index, this); build_index_task.detach(); return Status::OK(); } -void DBImpl::background_compaction() { +template +void DBImpl::background_compaction() { std::vector group_ids; _pMemMgr->serialize(group_ids); @@ -348,15 +367,18 @@ void DBImpl::background_compaction() { } } -Status DBImpl::drop_all() { +template +Status DBImpl::drop_all() { return _pMeta->drop_all(); } -Status DBImpl::count(const std::string& group_id, long& result) { +template +Status DBImpl::count(const std::string& group_id, long& result) { return _pMeta->count(group_id, result); } -DBImpl::~DBImpl() { +template +DBImpl::~DBImpl() { { std::unique_lock lock(_mutex); _shutting_down.store(true, std::memory_order_release); @@ -382,7 +404,7 @@ DB::~DB() {} void DB::Open(const Options& options, DB** dbptr) { *dbptr = nullptr; - *dbptr = new DBImpl(options); + *dbptr = new DBImpl(options); return; } diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index eda179a9b2..77632796d4 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -18,6 +18,7 @@ namespace meta { class Meta; } +template class DBImpl : public DB { public: DBImpl(const Options& options); @@ -46,6 +47,8 @@ public: virtual ~DBImpl(); private: + typedef MemManager MemManagerT; + void background_build_index(); Status build_index(const meta::GroupFileSchema&); Status try_build_index(); @@ -76,7 +79,7 @@ private: std::condition_variable bg_build_index_finish_signal_; std::shared_ptr _pMeta; - std::shared_ptr _pMemMgr; + std::shared_ptr _pMemMgr; }; // DBImpl diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index 904c5db150..cf557b41df 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -1,3 +1,6 @@ +#ifndef MEMMANGE_CPP__ +#define MEMMANGE_CPP__ + #include #include #include @@ -5,23 +8,24 @@ #include "MemManager.h" #include "Meta.h" -#include "FaissExecutionEngine.h" namespace zilliz { namespace vecwise { namespace engine { -MemVectors::MemVectors(const std::shared_ptr& meta_ptr, +template +MemVectors::MemVectors(const std::shared_ptr& meta_ptr, const meta::GroupFileSchema& schema, const Options& options) : pMeta_(meta_ptr), options_(options), schema_(schema), _pIdGenerator(new SimpleIDGenerator()), - pEE_(new FaissExecutionEngine(schema_.dimension, schema_.location)) { + pEE_(new EngineT(schema_.dimension, schema_.location)) { } -void MemVectors::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) { +template +void MemVectors::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) { _pIdGenerator->getNextIDNumbers(n_, vector_ids_); pEE_->AddWithIds(n_, vectors_, vector_ids_.data()); for(auto i=0 ; i +size_t MemVectors::total() const { return pEE_->Count(); } -size_t MemVectors::approximate_size() const { +template +size_t MemVectors::approximate_size() const { return pEE_->Size(); } -Status MemVectors::serialize(std::string& group_id) { +template +Status MemVectors::serialize(std::string& group_id) { group_id = schema_.group_id; auto rows = approximate_size(); pEE_->Serialize(); @@ -52,7 +59,8 @@ Status MemVectors::serialize(std::string& group_id) { return status; } -MemVectors::~MemVectors() { +template +MemVectors::~MemVectors() { if (_pIdGenerator != nullptr) { delete _pIdGenerator; _pIdGenerator = nullptr; @@ -63,7 +71,9 @@ MemVectors::~MemVectors() { * MemManager */ -VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) { +template +typename MemManager::VectorsPtr MemManager::get_mem_by_group( + const std::string& group_id) { auto memIt = _memMap.find(group_id); if (memIt != _memMap.end()) { return memIt->second; @@ -76,11 +86,12 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) { return nullptr; } - _memMap[group_id] = std::shared_ptr(new MemVectors(_pMeta, group_file, options_)); + _memMap[group_id] = VectorsPtr(new MemVectors(_pMeta, group_file, options_)); return _memMap[group_id]; } -Status MemManager::add_vectors(const std::string& group_id_, +template +Status MemManager::add_vectors(const std::string& group_id_, size_t n_, const float* vectors_, IDNumbers& vector_ids_) { @@ -88,11 +99,12 @@ Status MemManager::add_vectors(const std::string& group_id_, return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_); } -Status MemManager::add_vectors_no_lock(const std::string& group_id, +template +Status MemManager::add_vectors_no_lock(const std::string& group_id, size_t n, const float* vectors, IDNumbers& vector_ids) { - std::shared_ptr mem = get_mem_by_group(group_id); + VectorsPtr mem = get_mem_by_group(group_id); if (mem == nullptr) { return Status::NotFound("Group " + group_id + " not found!"); } @@ -101,7 +113,8 @@ Status MemManager::add_vectors_no_lock(const std::string& group_id, return Status::OK(); } -Status MemManager::mark_memory_as_immutable() { +template +Status MemManager::mark_memory_as_immutable() { std::unique_lock lock(_mutex); for (auto& kv: _memMap) { _immMems.push_back(kv.second); @@ -111,20 +124,8 @@ Status MemManager::mark_memory_as_immutable() { return Status::OK(); } -/* bool MemManager::need_serialize(double interval) { */ -/* if (_immMems.size() > 0) { */ -/* return false; */ -/* } */ - -/* auto diff = std::difftime(std::time(nullptr), _last_compact_time); */ -/* if (diff >= interval) { */ -/* return true; */ -/* } */ - -/* return false; */ -/* } */ - -Status MemManager::serialize(std::vector& group_ids) { +template +Status MemManager::serialize(std::vector& group_ids) { mark_memory_as_immutable(); std::unique_lock lock(serialization_mtx_); std::string group_id; @@ -141,3 +142,5 @@ Status MemManager::serialize(std::vector& group_ids) { } // namespace engine } // namespace vecwise } // namespace zilliz + +#endif diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index c1d8736407..ec8e19173f 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -19,8 +19,7 @@ namespace meta { class Meta; } -class FaissExecutionEngine; - +template class MemVectors { public: explicit MemVectors(const std::shared_ptr&, @@ -47,15 +46,18 @@ private: Options options_; meta::GroupFileSchema schema_; IDGenerator* _pIdGenerator; - std::shared_ptr pEE_; + std::shared_ptr pEE_; }; // MemVectors -typedef std::shared_ptr VectorsPtr; +template class MemManager { public: + typedef MemVectors ItemT; + typedef std::shared_ptr VectorsPtr; + MemManager(const std::shared_ptr& meta_, const Options& options) : _pMeta(meta_), options_(options) {} @@ -85,5 +87,6 @@ private: } // namespace engine } // namespace vecwise } // namespace zilliz +#include "MemManager.cpp" #endif