diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 489228eca8..a5db7a5019 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -17,3 +17,4 @@ Please mark all change in change log and use the ticket from JIRA. - MS-1 - Add CHANGELOG.md - MS-4 - Refactor the vecwise_engine code structure - MS-6 - Implement SDK interface part 1 +- MS-20 - Clean Code Part 1 diff --git a/cpp/src/db/DB.cpp b/cpp/src/db/DB.cpp index 5baa09bd71..610e353358 100644 --- a/cpp/src/db/DB.cpp +++ b/cpp/src/db/DB.cpp @@ -7,8 +7,6 @@ #include "DBImpl.h" #include "DBMetaImpl.h" #include "Env.h" -/* #include "FaissExecutionEngine.h" */ -/* #include "Traits.h" */ #include "Factories.h" namespace zilliz { diff --git a/cpp/src/db/DB.h b/cpp/src/db/DB.h index 7d976ad824..0f1db62cfb 100644 --- a/cpp/src/db/DB.h +++ b/cpp/src/db/DB.h @@ -5,12 +5,13 @@ ******************************************************************************/ #pragma once -#include #include "Options.h" #include "Meta.h" #include "Status.h" #include "Types.h" +#include + namespace zilliz { namespace vecwise { namespace engine { @@ -21,29 +22,22 @@ class DB { public: static void Open(const Options& options, DB** dbptr); - virtual Status add_group(meta::GroupSchema& group_info_) = 0; - virtual Status get_group(meta::GroupSchema& group_info_) = 0; - virtual Status delete_vectors(const std::string& group_id, - const meta::DatesT& dates) = 0; - virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; - virtual Status get_group_files(const std::string& group_id_, - const int date_delta_, - meta::GroupFilesSchema& group_files_info_) = 0; + virtual Status CreateTable(meta::TableSchema& table_schema_) = 0; + virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0; + virtual Status HasTable(const std::string& table_id_, bool& has_or_not_) = 0; - virtual Status add_vectors(const std::string& group_id_, + virtual Status InsertVectors(const std::string& table_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) = 0; - virtual Status search(const std::string& group_id, size_t k, size_t nq, + virtual Status Query(const std::string& table_id, size_t k, size_t nq, const float* vectors, QueryResults& results) = 0; - virtual Status search(const std::string& group_id, size_t k, size_t nq, + virtual Status Query(const std::string& table_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0; - virtual Status size(long& result) = 0; + virtual Status Size(long& result) = 0; - virtual Status drop_all() = 0; - - virtual Status count(const std::string& group_id, long& result) = 0; + virtual Status DropAll() = 0; DB() = default; DB(const DB&) = delete; diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 54c22eb48b..7384e53a34 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -5,15 +5,15 @@ ******************************************************************************/ #pragma once +#include "DB.h" +#include "MemManager.h" +#include "Types.h" +#include "Traits.h" + #include #include #include #include -#include "DB.h" -#include "MemManager.h" -#include "Types.h" -#include "FaissExecutionEngine.h" -#include "Traits.h" namespace zilliz { namespace vecwise { @@ -28,63 +28,56 @@ namespace meta { template class DBImpl : public DB { public: - typedef typename meta::Meta::Ptr MetaPtr; - typedef typename MemManager::Ptr MemManagerPtr; + using MetaPtr = meta::Meta::Ptr; + using MemManagerPtr = typename MemManager::Ptr; DBImpl(const Options& options); - virtual Status add_group(meta::GroupSchema& group_info) override; - virtual Status get_group(meta::GroupSchema& group_info) override; - virtual Status delete_vectors(const std::string& group_id, const meta::DatesT& dates) override; - virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; + virtual Status CreateTable(meta::TableSchema& table_schema) override; + virtual Status DescribeTable(meta::TableSchema& table_schema) override; + virtual Status HasTable(const std::string& table_id, bool& has_or_not) override; - virtual Status get_group_files(const std::string& group_id_, - const int date_delta_, - meta::GroupFilesSchema& group_files_info_) override; + virtual Status InsertVectors(const std::string& table_id, + size_t n, const float* vectors, IDNumbers& vector_ids) override; - virtual Status add_vectors(const std::string& group_id_, - size_t n, const float* vectors, IDNumbers& vector_ids_) override; - - virtual Status search(const std::string& group_id, size_t k, size_t nq, + virtual Status Query(const std::string& table_id, size_t k, size_t nq, const float* vectors, QueryResults& results) override; - virtual Status search(const std::string& group_id, size_t k, size_t nq, + virtual Status Query(const std::string& table_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) override; - virtual Status drop_all() override; + virtual Status DropAll() override; - virtual Status count(const std::string& group_id, long& result) override; - - virtual Status size(long& result) override; + virtual Status Size(long& result) override; virtual ~DBImpl(); private: - void background_build_index(); - Status build_index(const meta::GroupFileSchema&); - Status try_build_index(); - Status merge_files(const std::string& group_id, + void BackgroundBuildIndex(); + Status BuildIndex(const meta::TableFileSchema&); + Status TryBuildIndex(); + Status MergeFiles(const std::string& table_id, const meta::DateT& date, - const meta::GroupFilesSchema& files); - Status background_merge_files(const std::string& group_id); + const meta::TableFilesSchema& files); + Status BackgroundMergeFiles(const std::string& table_id); - void try_schedule_compaction(); - void start_timer_task(int interval_); - void background_timer_task(int interval_); + void TrySchedule(); + void StartTimerTasks(int interval); + void BackgroundTimerTask(int interval); static void BGWork(void* db); - void background_call(); - void background_compaction(); + void BackgroundCall(); + void BackgroundCompaction(); - Env* const _env; - const Options _options; + Env* const env_; + const Options options_; - std::mutex _mutex; - std::condition_variable _bg_work_finish_signal; - bool _bg_compaction_scheduled; - Status _bg_error; - std::atomic _shutting_down; + std::mutex mutex_; + std::condition_variable bg_work_finish_signal_; + bool bg_compaction_scheduled_; + Status bg_error_; + std::atomic shutting_down_; std::mutex build_index_mutex_; bool bg_build_index_started_; @@ -92,8 +85,8 @@ private: std::thread bg_timer_thread_; - MetaPtr _pMeta; - MemManagerPtr _pMemMgr; + MetaPtr pMeta_; + MemManagerPtr pMemMgr_; }; // DBImpl @@ -102,4 +95,4 @@ private: } // namespace vecwise } // namespace zilliz -#include "DBImpl.cpp" +#include "DBImpl.inl" diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.inl similarity index 57% rename from cpp/src/db/DBImpl.cpp rename to cpp/src/db/DBImpl.inl index 4028a1bbbe..d061a99959 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.inl @@ -3,8 +3,11 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#ifndef DBIMPL_CPP__ -#define DBIMPL_CPP__ +#pragma once + +#include "DBImpl.h" +#include "DBMetaImpl.h" +#include "Env.h" #include #include @@ -14,86 +17,68 @@ #include #include -#include "DBImpl.h" -#include "DBMetaImpl.h" -#include "Env.h" - namespace zilliz { namespace vecwise { namespace engine { template DBImpl::DBImpl(const Options& options) - : _env(options.env), - _options(options), - _bg_compaction_scheduled(false), - _shutting_down(false), + : env_(options.env), + options_(options), + bg_compaction_scheduled_(false), + shutting_down_(false), bg_build_index_started_(false), - _pMeta(new meta::DBMetaImpl(_options.meta)), - _pMemMgr(new MemManager(_pMeta, _options)) { - start_timer_task(_options.memory_sync_interval); + pMeta_(new meta::DBMetaImpl(options_.meta)), + pMemMgr_(new MemManager(pMeta_, options_)) { + StartTimerTasks(options_.memory_sync_interval); } template -Status DBImpl::add_group(meta::GroupSchema& group_info) { - return _pMeta->add_group(group_info); +Status DBImpl::CreateTable(meta::TableSchema& table_schema) { + return pMeta_->CreateTable(table_schema); } template -Status DBImpl::get_group(meta::GroupSchema& group_info) { - return _pMeta->get_group(group_info); +Status DBImpl::DescribeTable(meta::TableSchema& table_schema) { + return pMeta_->DescribeTable(table_schema); } template -Status DBImpl::delete_vectors(const std::string& group_id, - const meta::DatesT& dates) { - return _pMeta->delete_group_partitions(group_id, dates); +Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) { + return pMeta_->HasTable(table_id, has_or_not); } template -Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) { - return _pMeta->has_group(group_id_, has_or_not_); -} - -template -Status DBImpl::get_group_files(const std::string& group_id, - const int date_delta, - meta::GroupFilesSchema& group_files_info) { - return _pMeta->get_group_files(group_id, date_delta, group_files_info); - -} - -template -Status DBImpl::add_vectors(const std::string& group_id_, +Status DBImpl::InsertVectors(const std::string& table_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) { - Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_); + Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_); if (!status.ok()) { return status; } } template -Status DBImpl::search(const std::string &group_id, size_t k, size_t nq, +Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq, const float *vectors, QueryResults &results) { meta::DatesT dates = {meta::Meta::GetDate()}; - return search(group_id, k, nq, vectors, dates, results); + return Query(table_id, k, nq, vectors, dates, results); } template -Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, +Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { - meta::DatePartionedGroupFilesSchema files; - auto status = _pMeta->files_to_search(group_id, dates, files); + meta::DatePartionedTableFilesSchema files; + auto status = pMeta_->FilesToSearch(table_id, dates, files); if (!status.ok()) { return status; } LOG(DEBUG) << "Search DateT Size=" << files.size(); - meta::GroupFilesSchema index_files; - meta::GroupFilesSchema raw_files; + meta::TableFilesSchema index_files; + meta::TableFilesSchema raw_files; for (auto &day_files : files) { for (auto &file : day_files.second) { - file.file_type == meta::GroupFileSchema::INDEX ? + file.file_type == meta::TableFileSchema::INDEX ? index_files.push_back(file) : raw_files.push_back(file); } } @@ -132,7 +117,7 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, long search_set_size = 0; - auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void { + auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void { for (auto &file : file_vec) { EngineT index(file.dimension, file.location); index.Load(); @@ -204,98 +189,98 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, } if (results.empty()) { - return Status::NotFound("Group " + group_id + ", search result not found!"); + return Status::NotFound("Group " + table_id + ", search result not found!"); } return Status::OK(); } template -void DBImpl::start_timer_task(int interval_) { - bg_timer_thread_ = std::thread(&DBImpl::background_timer_task, this, interval_); +void DBImpl::StartTimerTasks(int interval) { + bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval); } template -void DBImpl::background_timer_task(int interval_) { +void DBImpl::BackgroundTimerTask(int interval) { Status status; while (true) { - if (!_bg_error.ok()) break; - if (_shutting_down.load(std::memory_order_acquire)) break; + if (!bg_error_.ok()) break; + if (shutting_down_.load(std::memory_order_acquire)) break; - std::this_thread::sleep_for(std::chrono::seconds(interval_)); + std::this_thread::sleep_for(std::chrono::seconds(interval)); - try_schedule_compaction(); + TrySchedule(); } } template -void DBImpl::try_schedule_compaction() { - if (_bg_compaction_scheduled) return; - if (!_bg_error.ok()) return; +void DBImpl::TrySchedule() { + if (bg_compaction_scheduled_) return; + if (!bg_error_.ok()) return; - _bg_compaction_scheduled = true; - _env->schedule(&DBImpl::BGWork, this); + bg_compaction_scheduled_ = true; + env_->Schedule(&DBImpl::BGWork, this); } template void DBImpl::BGWork(void* db_) { - reinterpret_cast(db_)->background_call(); + reinterpret_cast(db_)->BackgroundCall(); } template -void DBImpl::background_call() { - std::lock_guard lock(_mutex); - assert(_bg_compaction_scheduled); +void DBImpl::BackgroundCall() { + std::lock_guard lock(mutex_); + assert(bg_compaction_scheduled_); - if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire)) + if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire)) return ; - background_compaction(); + BackgroundCompaction(); - _bg_compaction_scheduled = false; - _bg_work_finish_signal.notify_all(); + bg_compaction_scheduled_ = false; + bg_work_finish_signal_.notify_all(); } template -Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, - const meta::GroupFilesSchema& files) { - meta::GroupFileSchema group_file; - group_file.group_id = group_id; - group_file.date = date; - Status status = _pMeta->add_group_file(group_file); +Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, + const meta::TableFilesSchema& files) { + meta::TableFileSchema table_file; + table_file.table_id = table_id; + table_file.date = date; + Status status = pMeta_->CreateTableFile(table_file); if (!status.ok()) { LOG(INFO) << status.ToString() << std::endl; return status; } - EngineT index(group_file.dimension, group_file.location); + EngineT index(table_file.dimension, table_file.location); - meta::GroupFilesSchema updated; + meta::TableFilesSchema updated; long index_size = 0; for (auto& file : files) { index.Merge(file.location); auto file_schema = file; - file_schema.file_type = meta::GroupFileSchema::TO_DELETE; + file_schema.file_type = meta::TableFileSchema::TO_DELETE; updated.push_back(file_schema); LOG(DEBUG) << "Merging file " << file_schema.file_id; index_size = index.Size(); - if (index_size >= _options.index_trigger_size) break; + if (index_size >= options_.index_trigger_size) break; } index.Serialize(); - if (index_size >= _options.index_trigger_size) { - group_file.file_type = meta::GroupFileSchema::TO_INDEX; + if (index_size >= options_.index_trigger_size) { + table_file.file_type = meta::TableFileSchema::TO_INDEX; } else { - group_file.file_type = meta::GroupFileSchema::RAW; + table_file.file_type = meta::TableFileSchema::RAW; } - group_file.size = index_size; - updated.push_back(group_file); - status = _pMeta->update_files(updated); - LOG(DEBUG) << "New merged file " << group_file.file_id << + table_file.size = index_size; + updated.push_back(table_file); + status = pMeta_->UpdateTableFiles(updated); + LOG(DEBUG) << "New merged file " << table_file.file_id << " of size=" << index.PhysicalSize()/(1024*1024) << " M"; index.Cache(); @@ -304,43 +289,39 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::Dat } template -Status DBImpl::background_merge_files(const std::string& group_id) { - meta::DatePartionedGroupFilesSchema raw_files; - auto status = _pMeta->files_to_merge(group_id, raw_files); +Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { + meta::DatePartionedTableFilesSchema raw_files; + auto status = pMeta_->FilesToMerge(table_id, raw_files); if (!status.ok()) { return status; } - /* if (raw_files.size() == 0) { */ - /* return Status::OK(); */ - /* } */ - bool has_merge = false; for (auto& kv : raw_files) { auto files = kv.second; - if (files.size() <= _options.merge_trigger_number) { + if (files.size() <= options_.merge_trigger_number) { continue; } has_merge = true; - merge_files(group_id, kv.first, kv.second); + MergeFiles(table_id, kv.first, kv.second); } - _pMeta->archive_files(); + pMeta_->Archive(); - try_build_index(); + TryBuildIndex(); - _pMeta->cleanup_ttl_files(1); + pMeta_->CleanUpFilesWithTTL(1); return Status::OK(); } template -Status DBImpl::build_index(const meta::GroupFileSchema& file) { - meta::GroupFileSchema group_file; - group_file.group_id = file.group_id; - group_file.date = file.date; - Status status = _pMeta->add_group_file(group_file); +Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { + meta::TableFileSchema table_file; + table_file.table_id = file.table_id; + table_file.date = file.date; + Status status = pMeta_->CreateTableFile(table_file); if (!status.ok()) { return status; } @@ -348,39 +329,39 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { EngineT to_index(file.dimension, file.location); to_index.Load(); - auto index = to_index.BuildIndex(group_file.location); + auto index = to_index.BuildIndex(table_file.location); - group_file.file_type = meta::GroupFileSchema::INDEX; - group_file.size = index->Size(); + table_file.file_type = meta::TableFileSchema::INDEX; + table_file.size = index->Size(); auto to_remove = file; - to_remove.file_type = meta::GroupFileSchema::TO_DELETE; + to_remove.file_type = meta::TableFileSchema::TO_DELETE; - meta::GroupFilesSchema update_files = {to_remove, group_file}; - _pMeta->update_files(update_files); + meta::TableFilesSchema update_files = {to_remove, table_file}; + pMeta_->UpdateTableFiles(update_files); - LOG(DEBUG) << "New index file " << group_file.file_id << " of size " + LOG(DEBUG) << "New index file " << table_file.file_id << " of size " << index->PhysicalSize()/(1024*1024) << " M" << " from file " << to_remove.file_id; index->Cache(); - _pMeta->archive_files(); + pMeta_->Archive(); return Status::OK(); } template -void DBImpl::background_build_index() { +void DBImpl::BackgroundBuildIndex() { std::lock_guard lock(build_index_mutex_); assert(bg_build_index_started_); - meta::GroupFilesSchema to_index_files; - _pMeta->files_to_index(to_index_files); + meta::TableFilesSchema to_index_files; + pMeta_->FilesToIndex(to_index_files); Status status; for (auto& file : to_index_files) { /* LOG(DEBUG) << "Buiding index for " << file.location; */ - status = build_index(file); + status = BuildIndex(file); if (!status.ok()) { - _bg_error = status; + bg_error_ = status; return; } } @@ -391,52 +372,47 @@ void DBImpl::background_build_index() { } template -Status DBImpl::try_build_index() { +Status DBImpl::TryBuildIndex() { if (bg_build_index_started_) return Status::OK(); - if (_shutting_down.load(std::memory_order_acquire)) return Status::OK(); + if (shutting_down_.load(std::memory_order_acquire)) return Status::OK(); bg_build_index_started_ = true; - std::thread build_index_task(&DBImpl::background_build_index, this); + std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this); build_index_task.detach(); return Status::OK(); } template -void DBImpl::background_compaction() { - std::vector group_ids; - _pMemMgr->serialize(group_ids); +void DBImpl::BackgroundCompaction() { + std::vector table_ids; + pMemMgr_->Serialize(table_ids); Status status; - for (auto group_id : group_ids) { - status = background_merge_files(group_id); + for (auto table_id : table_ids) { + status = BackgroundMergeFiles(table_id); if (!status.ok()) { - _bg_error = status; + bg_error_ = status; return; } } } template -Status DBImpl::drop_all() { - return _pMeta->drop_all(); +Status DBImpl::DropAll() { + return pMeta_->DropAll(); } template -Status DBImpl::count(const std::string& group_id, long& result) { - return _pMeta->count(group_id, result); -} - -template -Status DBImpl::size(long& result) { - return _pMeta->size(result); +Status DBImpl::Size(long& result) { + return pMeta_->Size(result); } template DBImpl::~DBImpl() { { - std::unique_lock lock(_mutex); - _shutting_down.store(true, std::memory_order_release); - while (_bg_compaction_scheduled) { - _bg_work_finish_signal.wait(lock); + std::unique_lock lock(mutex_); + shutting_down_.store(true, std::memory_order_release); + while (bg_compaction_scheduled_) { + bg_work_finish_signal_.wait(lock); } } { @@ -447,12 +423,10 @@ DBImpl::~DBImpl() { } bg_timer_thread_.join(); std::vector ids; - _pMemMgr->serialize(ids); - _env->Stop(); + pMemMgr_->Serialize(ids); + env_->Stop(); } } // namespace engine } // namespace vecwise } // namespace zilliz - -#endif diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 9b4e731f74..69f0fc56a5 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -3,6 +3,10 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#include "DBMetaImpl.h" +#include "IDGenerator.h" +#include "Utils.h" +#include "MetaConsts.h" #include #include @@ -13,11 +17,6 @@ #include #include -#include "DBMetaImpl.h" -#include "IDGenerator.h" -#include "Utils.h" -#include "MetaConsts.h" - namespace zilliz { namespace vecwise { namespace engine { @@ -27,21 +26,21 @@ using namespace sqlite_orm; inline auto StoragePrototype(const std::string& path) { return make_storage(path, - make_table("Group", - make_column("id", &GroupSchema::id, primary_key()), - make_column("group_id", &GroupSchema::group_id, unique()), - make_column("dimension", &GroupSchema::dimension), - make_column("created_on", &GroupSchema::created_on), - make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))), - make_table("GroupFile", - make_column("id", &GroupFileSchema::id, primary_key()), - make_column("group_id", &GroupFileSchema::group_id), - make_column("file_id", &GroupFileSchema::file_id), - make_column("file_type", &GroupFileSchema::file_type), - make_column("size", &GroupFileSchema::size, default_value(0)), - make_column("updated_time", &GroupFileSchema::updated_time), - make_column("created_on", &GroupFileSchema::created_on), - make_column("date", &GroupFileSchema::date)) + make_table("Table", + make_column("id", &TableSchema::id, primary_key()), + make_column("table_id", &TableSchema::table_id, unique()), + make_column("dimension", &TableSchema::dimension), + make_column("created_on", &TableSchema::created_on), + make_column("files_cnt", &TableSchema::files_cnt, default_value(0))), + make_table("TableFile", + make_column("id", &TableFileSchema::id, primary_key()), + make_column("table_id", &TableFileSchema::table_id), + make_column("file_id", &TableFileSchema::file_id), + make_column("file_type", &TableFileSchema::file_type), + make_column("size", &TableFileSchema::size, default_value(0)), + make_column("updated_time", &TableFileSchema::updated_time), + make_column("created_on", &TableFileSchema::created_on), + make_column("date", &TableFileSchema::date)) ); } @@ -49,77 +48,77 @@ inline auto StoragePrototype(const std::string& path) { using ConnectorT = decltype(StoragePrototype("")); static std::unique_ptr ConnectorPtr; -std::string DBMetaImpl::GetGroupPath(const std::string& group_id) { - return _options.path + "/tables/" + group_id; +std::string DBMetaImpl::GetTablePath(const std::string& table_id) { + return options_.path + "/tables/" + table_id; } -std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) { +std::string DBMetaImpl::GetTableDatePartitionPath(const std::string& table_id, DateT& date) { std::stringstream ss; - ss << GetGroupPath(group_id) << "/" << date; + ss << GetTablePath(table_id) << "/" << date; return ss.str(); } -void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) { +void DBMetaImpl::GetTableFilePath(TableFileSchema& group_file) { if (group_file.date == EmptyDate) { group_file.date = Meta::GetDate(); } std::stringstream ss; - ss << GetGroupDatePartitionPath(group_file.group_id, group_file.date) + ss << GetTableDatePartitionPath(group_file.table_id, group_file.date) << "/" << group_file.file_id; group_file.location = ss.str(); } -Status DBMetaImpl::NextGroupId(std::string& group_id) { +Status DBMetaImpl::NextTableId(std::string& table_id) { std::stringstream ss; SimpleIDGenerator g; - ss << g.getNextIDNumber(); - group_id = ss.str(); + ss << g.GetNextIDNumber(); + table_id = ss.str(); return Status::OK(); } Status DBMetaImpl::NextFileId(std::string& file_id) { std::stringstream ss; SimpleIDGenerator g; - ss << g.getNextIDNumber(); + ss << g.GetNextIDNumber(); file_id = ss.str(); return Status::OK(); } DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_) - : _options(options_) { - initialize(); + : options_(options_) { + Initialize(); } -Status DBMetaImpl::initialize() { - if (!boost::filesystem::is_directory(_options.path)) { - auto ret = boost::filesystem::create_directory(_options.path); +Status DBMetaImpl::Initialize() { + if (!boost::filesystem::is_directory(options_.path)) { + auto ret = boost::filesystem::create_directory(options_.path); if (!ret) { - LOG(ERROR) << "Create directory " << _options.path << " Error"; + LOG(ERROR) << "Create directory " << options_.path << " Error"; } assert(ret); } - ConnectorPtr = std::make_unique(StoragePrototype(_options.path+"/meta.sqlite")); + ConnectorPtr = std::make_unique(StoragePrototype(options_.path+"/meta.sqlite")); ConnectorPtr->sync_schema(); ConnectorPtr->open_forever(); // thread safe option ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log - cleanup(); + CleanUp(); return Status::OK(); } // PXU TODO: Temp solution. Will fix later -Status DBMetaImpl::delete_group_partitions(const std::string& group_id, - const meta::DatesT& dates) { +Status DBMetaImpl::DropPartitionsByDates(const std::string& table_id, + const DatesT& dates) { if (dates.size() == 0) { return Status::OK(); } - GroupSchema group_info; - group_info.group_id = group_id; - auto status = get_group(group_info); + TableSchema table_schema; + table_schema.table_id = table_id; + auto status = DescribeTable(table_schema); if (!status.ok()) { return status; } @@ -135,11 +134,11 @@ Status DBMetaImpl::delete_group_partitions(const std::string& group_id, try { ConnectorPtr->update_all( set( - c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE + c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE ), where( - c(&GroupFileSchema::group_id) == group_id and - in(&GroupFileSchema::date, dates) + c(&TableFileSchema::table_id) == table_id and + in(&TableFileSchema::date, dates) )); } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -148,24 +147,24 @@ Status DBMetaImpl::delete_group_partitions(const std::string& group_id, return Status::OK(); } -Status DBMetaImpl::add_group(GroupSchema& group_info) { - if (group_info.group_id == "") { - NextGroupId(group_info.group_id); +Status DBMetaImpl::CreateTable(TableSchema& table_schema) { + if (table_schema.table_id == "") { + NextTableId(table_schema.table_id); } - group_info.files_cnt = 0; - group_info.id = -1; - group_info.created_on = utils::GetMicroSecTimeStamp(); + table_schema.files_cnt = 0; + table_schema.id = -1; + table_schema.created_on = utils::GetMicroSecTimeStamp(); { try { - auto id = ConnectorPtr->insert(group_info); - group_info.id = id; + auto id = ConnectorPtr->insert(table_schema); + table_schema.id = id; } catch (...) { - return Status::DBTransactionError("Add Group Error"); + return Status::DBTransactionError("Add Table Error"); } } - auto group_path = GetGroupPath(group_info.group_id); + auto group_path = GetTablePath(table_schema.table_id); if (!boost::filesystem::is_directory(group_path)) { auto ret = boost::filesystem::create_directories(group_path); @@ -178,24 +177,20 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) { return Status::OK(); } -Status DBMetaImpl::get_group(GroupSchema& group_info) { - return get_group_no_lock(group_info); -} - -Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { +Status DBMetaImpl::DescribeTable(TableSchema& table_schema) { try { - auto groups = ConnectorPtr->select(columns(&GroupSchema::id, - &GroupSchema::group_id, - &GroupSchema::files_cnt, - &GroupSchema::dimension), - where(c(&GroupSchema::group_id) == group_info.group_id)); + auto groups = ConnectorPtr->select(columns(&TableSchema::id, + &TableSchema::table_id, + &TableSchema::files_cnt, + &TableSchema::dimension), + where(c(&TableSchema::table_id) == table_schema.table_id)); assert(groups.size() <= 1); if (groups.size() == 1) { - group_info.id = std::get<0>(groups[0]); - group_info.files_cnt = std::get<2>(groups[0]); - group_info.dimension = std::get<3>(groups[0]); + table_schema.id = std::get<0>(groups[0]); + table_schema.files_cnt = std::get<2>(groups[0]); + table_schema.dimension = std::get<3>(groups[0]); } else { - return Status::NotFound("Group " + group_info.group_id + " not found"); + return Status::NotFound("Table " + table_schema.table_id + " not found"); } } catch (std::exception &e) { LOG(DEBUG) << e.what(); @@ -205,12 +200,12 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { return Status::OK(); } -Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { +Status DBMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) { try { - auto groups = ConnectorPtr->select(columns(&GroupSchema::id), - where(c(&GroupSchema::group_id) == group_id)); - assert(groups.size() <= 1); - if (groups.size() == 1) { + auto tables = ConnectorPtr->select(columns(&TableSchema::id), + where(c(&TableSchema::table_id) == table_id)); + assert(tables.size() <= 1); + if (tables.size() == 1) { has_or_not = true; } else { has_or_not = false; @@ -222,35 +217,35 @@ Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { return Status::OK(); } -Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { - if (group_file.date == EmptyDate) { - group_file.date = Meta::GetDate(); +Status DBMetaImpl::CreateTableFile(TableFileSchema& file_schema) { + if (file_schema.date == EmptyDate) { + file_schema.date = Meta::GetDate(); } - GroupSchema group_info; - group_info.group_id = group_file.group_id; - auto status = get_group(group_info); + TableSchema table_schema; + table_schema.table_id = file_schema.table_id; + auto status = DescribeTable(table_schema); if (!status.ok()) { return status; } - NextFileId(group_file.file_id); - group_file.file_type = GroupFileSchema::NEW; - group_file.dimension = group_info.dimension; - group_file.size = 0; - group_file.created_on = utils::GetMicroSecTimeStamp(); - group_file.updated_time = group_file.created_on; - GetGroupFilePath(group_file); + NextFileId(file_schema.file_id); + file_schema.file_type = TableFileSchema::NEW; + file_schema.dimension = table_schema.dimension; + file_schema.size = 0; + file_schema.created_on = utils::GetMicroSecTimeStamp(); + file_schema.updated_time = file_schema.created_on; + GetTableFilePath(file_schema); { try { - auto id = ConnectorPtr->insert(group_file); - group_file.id = id; + auto id = ConnectorPtr->insert(file_schema); + file_schema.id = id; } catch (...) { return Status::DBTransactionError("Add file Error"); } } - auto partition_path = GetGroupDatePartitionPath(group_file.group_id, group_file.date); + auto partition_path = GetTableDatePartitionPath(file_schema.table_id, file_schema.date); if (!boost::filesystem::is_directory(partition_path)) { auto ret = boost::filesystem::create_directory(partition_path); @@ -263,41 +258,41 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { return Status::OK(); } -Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { +Status DBMetaImpl::FilesToIndex(TableFilesSchema& files) { files.clear(); try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::size, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX)); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX)); - std::map groups; + std::map groups; + TableFileSchema table_file; for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.size = std::get<4>(file); - group_file.date = std::get<5>(file); - GetGroupFilePath(group_file); - auto groupItr = groups.find(group_file.group_id); + table_file.id = std::get<0>(file); + table_file.table_id = std::get<1>(file); + table_file.file_id = std::get<2>(file); + table_file.file_type = std::get<3>(file); + table_file.size = std::get<4>(file); + table_file.date = std::get<5>(file); + GetTableFilePath(table_file); + auto groupItr = groups.find(table_file.table_id); if (groupItr == groups.end()) { - GroupSchema group_info; - group_info.group_id = group_file.group_id; - auto status = get_group_no_lock(group_info); + TableSchema table_schema; + table_schema.table_id = table_file.table_id; + auto status = DescribeTable(table_schema); if (!status.ok()) { return status; } - groups[group_file.group_id] = group_info; + groups[table_file.table_id] = table_schema; } - group_file.dimension = groups[group_file.group_id].dimension; - files.push_back(group_file); + table_file.dimension = groups[table_file.table_id].dimension; + files.push_back(table_file); } } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -307,48 +302,49 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { return Status::OK(); } -Status DBMetaImpl::files_to_search(const std::string &group_id, +Status DBMetaImpl::FilesToSearch(const std::string &table_id, const DatesT& partition, - DatePartionedGroupFilesSchema &files) { + DatePartionedTableFilesSchema &files) { files.clear(); DatesT today = {Meta::GetDate()}; const DatesT& dates = (partition.empty() == true) ? today : partition; try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::size, - &GroupFileSchema::date), - where(c(&GroupFileSchema::group_id) == group_id and - in(&GroupFileSchema::date, dates) and - (c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or - c(&GroupFileSchema::file_type) == (int) GroupFileSchema::TO_INDEX or - c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX))); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::table_id) == table_id and + in(&TableFileSchema::date, dates) and + (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or + c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_INDEX or + c(&TableFileSchema::file_type) == (int) TableFileSchema::INDEX))); - GroupSchema group_info; - group_info.group_id = group_id; - auto status = get_group_no_lock(group_info); + TableSchema table_schema; + table_schema.table_id = table_id; + auto status = DescribeTable(table_schema); if (!status.ok()) { return status; } + TableFileSchema table_file; + for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.size = std::get<4>(file); - group_file.date = std::get<5>(file); - group_file.dimension = group_info.dimension; - GetGroupFilePath(group_file); - auto dateItr = files.find(group_file.date); + table_file.id = std::get<0>(file); + table_file.table_id = std::get<1>(file); + table_file.file_id = std::get<2>(file); + table_file.file_type = std::get<3>(file); + table_file.size = std::get<4>(file); + table_file.date = std::get<5>(file); + table_file.dimension = table_schema.dimension; + GetTableFilePath(table_file); + auto dateItr = files.find(table_file.date); if (dateItr == files.end()) { - files[group_file.date] = GroupFilesSchema(); + files[table_file.date] = TableFilesSchema(); } - files[group_file.date].push_back(group_file); + files[table_file.date].push_back(table_file); } } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -358,42 +354,42 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, return Status::OK(); } -Status DBMetaImpl::files_to_merge(const std::string& group_id, - DatePartionedGroupFilesSchema& files) { +Status DBMetaImpl::FilesToMerge(const std::string& table_id, + DatePartionedTableFilesSchema& files) { files.clear(); try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::size, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and - c(&GroupFileSchema::group_id) == group_id)); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW and + c(&TableFileSchema::table_id) == table_id)); - GroupSchema group_info; - group_info.group_id = group_id; - auto status = get_group_no_lock(group_info); + TableSchema table_schema; + table_schema.table_id = table_id; + auto status = DescribeTable(table_schema); if (!status.ok()) { return status; } + TableFileSchema table_file; for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.size = std::get<4>(file); - group_file.date = std::get<5>(file); - group_file.dimension = group_info.dimension; - GetGroupFilePath(group_file); - auto dateItr = files.find(group_file.date); + table_file.id = std::get<0>(file); + table_file.table_id = std::get<1>(file); + table_file.file_id = std::get<2>(file); + table_file.file_type = std::get<3>(file); + table_file.size = std::get<4>(file); + table_file.date = std::get<5>(file); + table_file.dimension = table_schema.dimension; + GetTableFilePath(table_file); + auto dateItr = files.find(table_file.date); if (dateItr == files.end()) { - files[group_file.date] = GroupFilesSchema(); + files[table_file.date] = TableFilesSchema(); } - files[group_file.date].push_back(group_file); + files[table_file.date].push_back(table_file); } } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -403,36 +399,29 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, return Status::OK(); } -Status DBMetaImpl::has_group_file(const std::string& group_id_, - const std::string& file_id_, - bool& has_or_not_) { - //PXU TODO - return Status::OK(); -} +Status DBMetaImpl::GetTableFile(TableFileSchema& file_schema) { -Status DBMetaImpl::get_group_file(const std::string& group_id_, - const std::string& file_id_, - GroupFileSchema& group_file_info_) { try { - auto files = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::size, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_id) == file_id_ and - c(&GroupFileSchema::group_id) == group_id_ + auto files = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::file_id) == file_schema.file_id and + c(&TableFileSchema::table_id) == file_schema.table_id )); assert(files.size() <= 1); if (files.size() == 1) { - group_file_info_.id = std::get<0>(files[0]); - group_file_info_.group_id = std::get<1>(files[0]); - group_file_info_.file_id = std::get<2>(files[0]); - group_file_info_.file_type = std::get<3>(files[0]); - group_file_info_.size = std::get<4>(files[0]); - group_file_info_.date = std::get<5>(files[0]); + file_schema.id = std::get<0>(files[0]); + file_schema.table_id = std::get<1>(files[0]); + file_schema.file_id = std::get<2>(files[0]); + file_schema.file_type = std::get<3>(files[0]); + file_schema.size = std::get<4>(files[0]); + file_schema.date = std::get<5>(files[0]); } else { - return Status::NotFound("GroupFile " + file_id_ + " not found"); + return Status::NotFound("Table:" + file_schema.table_id + + " File:" + file_schema.file_id + " not found"); } } catch (std::exception &e) { LOG(DEBUG) << e.what(); @@ -442,16 +431,9 @@ Status DBMetaImpl::get_group_file(const std::string& group_id_, return Status::OK(); } -Status DBMetaImpl::get_group_files(const std::string& group_id_, - const int date_delta_, - GroupFilesSchema& group_files_info_) { - // PXU TODO - return Status::OK(); -} - // PXU TODO: Support Swap -Status DBMetaImpl::archive_files() { - auto& criterias = _options.archive_conf.GetCriterias(); +Status DBMetaImpl::Archive() { + auto& criterias = options_.archive_conf.GetCriterias(); if (criterias.size() == 0) { return Status::OK(); } @@ -466,11 +448,11 @@ Status DBMetaImpl::archive_files() { { ConnectorPtr->update_all( set( - c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE + c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE ), where( - c(&GroupFileSchema::created_on) < (long)(now - usecs) and - c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE + c(&TableFileSchema::created_on) < (long)(now - usecs) and + c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE )); } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -479,23 +461,22 @@ Status DBMetaImpl::archive_files() { } if (criteria == "disk") { long sum = 0; - size(sum); + Size(sum); - // PXU TODO: refactor size auto to_delete = (sum - limit*G); - discard_files_of_size(to_delete); + DiscardFiles(to_delete); } } return Status::OK(); } -Status DBMetaImpl::size(long& result) { +Status DBMetaImpl::Size(long& result) { result = 0; try { - auto selected = ConnectorPtr->select(columns(sum(&GroupFileSchema::size)), + auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size)), where( - c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE + c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE )); for (auto& sub_query : selected) { @@ -512,27 +493,28 @@ Status DBMetaImpl::size(long& result) { return Status::OK(); } -Status DBMetaImpl::discard_files_of_size(long to_discard_size) { - LOG(DEBUG) << "Abort to discard size=" << to_discard_size; +Status DBMetaImpl::DiscardFiles(long to_discard_size) { + LOG(DEBUG) << "About to discard size=" << to_discard_size; if (to_discard_size <= 0) { return Status::OK(); } try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::size), - where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE), - order_by(&GroupFileSchema::id), - limit(10)); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::size), + where(c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE), + order_by(&TableFileSchema::id), + limit(10)); + std::vector ids; + TableFileSchema table_file; for (auto& file : selected) { if (to_discard_size <= 0) break; - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.size = std::get<1>(file); - ids.push_back(group_file.id); - LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size; - to_discard_size -= group_file.size; + table_file.id = std::get<0>(file); + table_file.size = std::get<1>(file); + ids.push_back(table_file.id); + LOG(DEBUG) << "Discard table_file.id=" << table_file.file_id << " table_file.size=" << table_file.size; + to_discard_size -= table_file.size; } if (ids.size() == 0) { @@ -541,10 +523,10 @@ Status DBMetaImpl::discard_files_of_size(long to_discard_size) { ConnectorPtr->update_all( set( - c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE + c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE ), where( - in(&GroupFileSchema::id, ids) + in(&TableFileSchema::id, ids) )); } catch (std::exception & e) { @@ -553,22 +535,22 @@ Status DBMetaImpl::discard_files_of_size(long to_discard_size) { } - return discard_files_of_size(to_discard_size); + return DiscardFiles(to_discard_size); } -Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { - group_file.updated_time = utils::GetMicroSecTimeStamp(); +Status DBMetaImpl::UpdateTableFile(TableFileSchema& file_schema) { + file_schema.updated_time = utils::GetMicroSecTimeStamp(); try { - ConnectorPtr->update(group_file); + ConnectorPtr->update(file_schema); } catch (std::exception & e) { LOG(DEBUG) << e.what(); - LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id; + LOG(DEBUG) << "table_id= " << file_schema.table_id << " file_id=" << file_schema.file_id; throw e; } return Status::OK(); } -Status DBMetaImpl::update_files(GroupFilesSchema& files) { +Status DBMetaImpl::UpdateTableFiles(TableFilesSchema& files) { try { auto commited = ConnectorPtr->transaction([&] () mutable { for (auto& file : files) { @@ -587,34 +569,34 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) { return Status::OK(); } -Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) { +Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { auto now = utils::GetMicroSecTimeStamp(); try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::size, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and - c(&GroupFileSchema::updated_time) > now - seconds*US_PS)); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE and + c(&TableFileSchema::updated_time) > now - seconds*US_PS)); - GroupFilesSchema updated; + TableFilesSchema updated; + TableFileSchema table_file; for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.size = std::get<4>(file); - group_file.date = std::get<5>(file); - GetGroupFilePath(group_file); - if (group_file.file_type == GroupFileSchema::TO_DELETE) { - boost::filesystem::remove(group_file.location); + table_file.id = std::get<0>(file); + table_file.table_id = std::get<1>(file); + table_file.file_id = std::get<2>(file); + table_file.file_type = std::get<3>(file); + table_file.size = std::get<4>(file); + table_file.date = std::get<5>(file); + GetTableFilePath(table_file); + if (table_file.file_type == TableFileSchema::TO_DELETE) { + boost::filesystem::remove(table_file.location); } - ConnectorPtr->remove(group_file.id); - /* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */ + ConnectorPtr->remove(table_file.id); + /* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */ } } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -624,33 +606,33 @@ Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) { return Status::OK(); } -Status DBMetaImpl::cleanup() { +Status DBMetaImpl::CleanUp() { try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, - &GroupFileSchema::group_id, - &GroupFileSchema::file_id, - &GroupFileSchema::file_type, - &GroupFileSchema::size, - &GroupFileSchema::date), - where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or - c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW)); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE or + c(&TableFileSchema::file_type) == (int)TableFileSchema::NEW)); - GroupFilesSchema updated; + TableFilesSchema updated; + TableFileSchema table_file; for (auto& file : selected) { - GroupFileSchema group_file; - group_file.id = std::get<0>(file); - group_file.group_id = std::get<1>(file); - group_file.file_id = std::get<2>(file); - group_file.file_type = std::get<3>(file); - group_file.size = std::get<4>(file); - group_file.date = std::get<5>(file); - GetGroupFilePath(group_file); - if (group_file.file_type == GroupFileSchema::TO_DELETE) { - boost::filesystem::remove(group_file.location); + table_file.id = std::get<0>(file); + table_file.table_id = std::get<1>(file); + table_file.file_id = std::get<2>(file); + table_file.file_type = std::get<3>(file); + table_file.size = std::get<4>(file); + table_file.date = std::get<5>(file); + GetTableFilePath(table_file); + if (table_file.file_type == TableFileSchema::TO_DELETE) { + boost::filesystem::remove(table_file.location); } - ConnectorPtr->remove(group_file.id); - /* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */ + ConnectorPtr->remove(table_file.id); + /* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */ } } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -660,19 +642,19 @@ Status DBMetaImpl::cleanup() { return Status::OK(); } -Status DBMetaImpl::count(const std::string& group_id, long& result) { +Status DBMetaImpl::Count(const std::string& table_id, long& result) { try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size, - &GroupFileSchema::date), - where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or - c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or - c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and - c(&GroupFileSchema::group_id) == group_id)); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::size, + &TableFileSchema::date), + where((c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW or + c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX or + c(&TableFileSchema::file_type) == (int)TableFileSchema::INDEX) and + c(&TableFileSchema::table_id) == table_id)); - GroupSchema group_info; - group_info.group_id = group_id; - auto status = get_group_no_lock(group_info); + TableSchema table_schema; + table_schema.table_id = table_id; + auto status = DescribeTable(table_schema); if (!status.ok()) { return status; } @@ -682,7 +664,7 @@ Status DBMetaImpl::count(const std::string& group_id, long& result) { result += std::get<0>(file); } - result /= group_info.dimension; + result /= table_schema.dimension; } catch (std::exception & e) { LOG(DEBUG) << e.what(); @@ -691,15 +673,15 @@ Status DBMetaImpl::count(const std::string& group_id, long& result) { return Status::OK(); } -Status DBMetaImpl::drop_all() { - if (boost::filesystem::is_directory(_options.path)) { - boost::filesystem::remove_all(_options.path); +Status DBMetaImpl::DropAll() { + if (boost::filesystem::is_directory(options_.path)) { + boost::filesystem::remove_all(options_.path); } return Status::OK(); } DBMetaImpl::~DBMetaImpl() { - cleanup(); + CleanUp(); } } // namespace meta diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 6108860927..9710a3be90 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -19,62 +19,53 @@ class DBMetaImpl : public Meta { public: DBMetaImpl(const DBMetaOptions& options_); - virtual Status add_group(GroupSchema& group_info) override; - virtual Status get_group(GroupSchema& group_info_) override; - virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; + virtual Status CreateTable(TableSchema& table_schema) override; + virtual Status DescribeTable(TableSchema& group_info_) override; + virtual Status HasTable(const std::string& table_id, bool& has_or_not) override; - virtual Status add_group_file(GroupFileSchema& group_file_info) override; - virtual Status delete_group_partitions(const std::string& group_id, - const meta::DatesT& dates) override; + virtual Status CreateTableFile(TableFileSchema& file_schema) override; + virtual Status DropPartitionsByDates(const std::string& table_id, + const DatesT& dates) override; - virtual Status has_group_file(const std::string& group_id_, - const std::string& file_id_, - bool& has_or_not_) override; - virtual Status get_group_file(const std::string& group_id_, - const std::string& file_id_, - GroupFileSchema& group_file_info_) override; - virtual Status update_group_file(GroupFileSchema& group_file_) override; + virtual Status GetTableFile(TableFileSchema& file_schema) override; - virtual Status get_group_files(const std::string& group_id_, - const int date_delta_, - GroupFilesSchema& group_files_info_) override; + virtual Status UpdateTableFile(TableFileSchema& file_schema) override; - virtual Status update_files(GroupFilesSchema& files) override; + virtual Status UpdateTableFiles(TableFilesSchema& files) override; - virtual Status files_to_merge(const std::string& group_id, - DatePartionedGroupFilesSchema& files) override; - - virtual Status files_to_search(const std::string& group_id, + virtual Status FilesToSearch(const std::string& table_id, const DatesT& partition, - DatePartionedGroupFilesSchema& files) override; + DatePartionedTableFilesSchema& files) override; - virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status FilesToMerge(const std::string& table_id, + DatePartionedTableFilesSchema& files) override; - virtual Status archive_files() override; + virtual Status FilesToIndex(TableFilesSchema&) override; - virtual Status size(long& result) override; + virtual Status Archive() override; - virtual Status cleanup() override; + virtual Status Size(long& result) override; - virtual Status cleanup_ttl_files(uint16_t seconds) override; + virtual Status CleanUp() override; - virtual Status drop_all() override; + virtual Status CleanUpFilesWithTTL(uint16_t seconds) override; - virtual Status count(const std::string& group_id, long& result) override; + virtual Status DropAll() override; + + virtual Status Count(const std::string& table_id, long& result) override; virtual ~DBMetaImpl(); private: Status NextFileId(std::string& file_id); - Status NextGroupId(std::string& group_id); - Status discard_files_of_size(long to_discard_size); - Status get_group_no_lock(GroupSchema& group_info); - std::string GetGroupPath(const std::string& group_id); - std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date); - void GetGroupFilePath(GroupFileSchema& group_file); - Status initialize(); + Status NextTableId(std::string& table_id); + Status DiscardFiles(long to_discard_size); + std::string GetTablePath(const std::string& table_id); + std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date); + void GetTableFilePath(TableFileSchema& group_file); + Status Initialize(); - const DBMetaOptions _options; + const DBMetaOptions options_; }; // DBMetaImpl } // namespace meta diff --git a/cpp/src/db/Env.cpp b/cpp/src/db/Env.cpp index f3a4b64810..5c5bcdb52a 100644 --- a/cpp/src/db/Env.cpp +++ b/cpp/src/db/Env.cpp @@ -13,66 +13,66 @@ namespace vecwise { namespace engine { Env::Env() - : _bg_work_started(false), - _shutting_down(false) { + : bg_work_started_(false), + shutting_down_(false) { } -void Env::schedule(void (*function_)(void* arg_), void* arg_) { - std::unique_lock lock(_bg_work_mutex); - if (_shutting_down) return; +void Env::Schedule(void (*function)(void* arg), void* arg) { + std::unique_lock lock(bg_work_mutex_); + if (shutting_down_) return; - if (!_bg_work_started) { - _bg_work_started = true; + if (!bg_work_started_) { + bg_work_started_ = true; std::thread bg_thread(Env::BackgroundThreadEntryPoint, this); bg_thread.detach(); } - if (_bg_work_queue.empty()) { - _bg_work_cv.notify_one(); + if (bg_work_queue_.empty()) { + bg_work_cv_.notify_one(); } - _bg_work_queue.emplace(function_, arg_); + bg_work_queue_.emplace(function, arg); } -void Env::backgroud_thread_main() { - while (!_shutting_down) { - std::unique_lock lock(_bg_work_mutex); - while (_bg_work_queue.empty() && !_shutting_down) { - _bg_work_cv.wait(lock); +void Env::BackgroundThreadMain() { + while (!shutting_down_) { + std::unique_lock lock(bg_work_mutex_); + while (bg_work_queue_.empty() && !shutting_down_) { + bg_work_cv_.wait(lock); } - if (_shutting_down) break; + if (shutting_down_) break; - assert(!_bg_work_queue.empty()); - auto bg_function = _bg_work_queue.front()._function; - void* bg_arg = _bg_work_queue.front()._arg; - _bg_work_queue.pop(); + assert(!bg_work_queue_.empty()); + auto bg_function = bg_work_queue_.front().function_; + void* bg_arg = bg_work_queue_.front().arg_; + bg_work_queue_.pop(); lock.unlock(); bg_function(bg_arg); } - std::unique_lock lock(_bg_work_mutex); - _bg_work_started = false; - _bg_work_cv.notify_all(); + std::unique_lock lock(bg_work_mutex_); + bg_work_started_ = false; + bg_work_cv_.notify_all(); } void Env::Stop() { { - std::unique_lock lock(_bg_work_mutex); - if (_shutting_down || !_bg_work_started) return; + std::unique_lock lock(bg_work_mutex_); + if (shutting_down_ || !bg_work_started_) return; } - _shutting_down = true; + shutting_down_ = true; { - std::unique_lock lock(_bg_work_mutex); - if (_bg_work_queue.empty()) { - _bg_work_cv.notify_one(); + std::unique_lock lock(bg_work_mutex_); + if (bg_work_queue_.empty()) { + bg_work_cv_.notify_one(); } - while (_bg_work_started) { - _bg_work_cv.wait(lock); + while (bg_work_started_) { + bg_work_cv_.wait(lock); } } - _shutting_down = false; + shutting_down_ = false; } Env::~Env() {} diff --git a/cpp/src/db/Env.h b/cpp/src/db/Env.h index 9eed68cddb..28ecaaacaf 100644 --- a/cpp/src/db/Env.h +++ b/cpp/src/db/Env.h @@ -22,7 +22,7 @@ public: Env(const Env&) = delete; Env& operator=(const Env&) = delete; - void schedule(void (*function_)(void* arg_), void* arg_); + void Schedule(void (*function)(void* arg), void* arg); virtual void Stop(); @@ -31,25 +31,24 @@ public: static Env* Default(); protected: - void backgroud_thread_main(); + void BackgroundThreadMain(); static void BackgroundThreadEntryPoint(Env* env) { - env->backgroud_thread_main(); + env->BackgroundThreadMain(); } struct BGWork { - explicit BGWork(void (*function_)(void*), void* arg_) - : _function(function_), _arg(arg_) {} + explicit BGWork(void (*function)(void*), void* arg) + : function_(function), arg_(arg) {} - void (* const _function)(void*); - void* const _arg; + void (* const function_)(void*); + void* const arg_; }; - std::mutex _bg_work_mutex; - std::condition_variable _bg_work_cv; - std::queue _bg_work_queue; - bool _bg_work_started; - std::atomic _shutting_down; - + std::mutex bg_work_mutex_; + std::condition_variable bg_work_cv_; + std::queue bg_work_queue_; + bool bg_work_started_; + std::atomic shutting_down_; }; // Env } // namespace engine diff --git a/cpp/src/db/ExecutionEngine.cpp b/cpp/src/db/ExecutionEngine.cpp index b689e6cfed..6eb04bb788 100644 --- a/cpp/src/db/ExecutionEngine.cpp +++ b/cpp/src/db/ExecutionEngine.cpp @@ -3,9 +3,10 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#include #include "ExecutionEngine.h" +#include + namespace zilliz { namespace vecwise { namespace engine { diff --git a/cpp/src/db/ExecutionEngine.h b/cpp/src/db/ExecutionEngine.h index 0b7af1b53e..6ad91959e8 100644 --- a/cpp/src/db/ExecutionEngine.h +++ b/cpp/src/db/ExecutionEngine.h @@ -5,11 +5,11 @@ ******************************************************************************/ #pragma once +#include "Status.h" + #include #include -#include "Status.h" - namespace zilliz { namespace vecwise { namespace engine { diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index 87aae6afbb..191d3dbae5 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -3,6 +3,11 @@ // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#include "Factories.h" +#include "DBImpl.h" +#include "FaissExecutionEngine.h" +#include "Traits.h" + #include #include #include @@ -11,12 +16,6 @@ #include #include -#include "Factories.h" -#include "DBImpl.h" -#include "FaissExecutionEngine.h" -#include "Traits.h" - - namespace zilliz { namespace vecwise { namespace engine { diff --git a/cpp/src/db/Factories.h b/cpp/src/db/Factories.h index 3c9c3a99aa..0e9ab71bb1 100644 --- a/cpp/src/db/Factories.h +++ b/cpp/src/db/Factories.h @@ -3,15 +3,15 @@ // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// - #pragma once -#include -#include #include "DB.h" #include "DBMetaImpl.h" #include "Options.h" +#include +#include + namespace zilliz { namespace vecwise { namespace engine { diff --git a/cpp/src/db/FaissExecutionEngine.h b/cpp/src/db/FaissExecutionEngine.h index 739e09efca..2915acc0eb 100644 --- a/cpp/src/db/FaissExecutionEngine.h +++ b/cpp/src/db/FaissExecutionEngine.h @@ -5,11 +5,11 @@ ******************************************************************************/ #pragma once +#include "ExecutionEngine.h" + #include #include -#include "ExecutionEngine.h" - namespace faiss { class Index; } @@ -22,7 +22,7 @@ namespace engine { template class FaissExecutionEngine : public ExecutionEngine> { public: - typedef std::shared_ptr> Ptr; + using Ptr = std::shared_ptr>; FaissExecutionEngine(uint16_t dimension, const std::string& location); FaissExecutionEngine(std::shared_ptr index, const std::string& location); @@ -53,7 +53,9 @@ public: Ptr BuildIndex(const std::string&); Status Cache(); + protected: + std::shared_ptr pIndex_; std::string location_; }; @@ -63,4 +65,4 @@ protected: } // namespace vecwise } // namespace zilliz -#include "FaissExecutionEngine.cpp" +#include "FaissExecutionEngine.inl" diff --git a/cpp/src/db/FaissExecutionEngine.cpp b/cpp/src/db/FaissExecutionEngine.inl similarity index 98% rename from cpp/src/db/FaissExecutionEngine.cpp rename to cpp/src/db/FaissExecutionEngine.inl index c2165c948d..961090f893 100644 --- a/cpp/src/db/FaissExecutionEngine.cpp +++ b/cpp/src/db/FaissExecutionEngine.inl @@ -3,8 +3,9 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#ifndef FAISSEXECUTIONENGINE_CPP__ -#define FAISSEXECUTIONENGINE_CPP__ +#pragma once + +#include "FaissExecutionEngine.h" #include #include @@ -15,8 +16,6 @@ #include #include -#include "FaissExecutionEngine.h" - namespace zilliz { namespace vecwise { namespace engine { @@ -135,5 +134,3 @@ Status FaissExecutionEngine::Cache() { } // namespace engine } // namespace vecwise } // namespace zilliz - -#endif diff --git a/cpp/src/db/IDGenerator.cpp b/cpp/src/db/IDGenerator.cpp index 633ee5fe69..d6e71091e6 100644 --- a/cpp/src/db/IDGenerator.cpp +++ b/cpp/src/db/IDGenerator.cpp @@ -3,30 +3,29 @@ // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#include "IDGenerator.h" + #include #include #include -#include "IDGenerator.h" - - namespace zilliz { namespace vecwise { namespace engine { IDGenerator::~IDGenerator() {} -IDNumber SimpleIDGenerator::getNextIDNumber() { +IDNumber SimpleIDGenerator::GetNextIDNumber() { auto now = std::chrono::system_clock::now(); auto micros = std::chrono::duration_cast( now.time_since_epoch()).count(); return micros * MAX_IDS_PER_MICRO; } -void SimpleIDGenerator::nextIDNumbers(size_t n, IDNumbers& ids) { +void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) { if (n > MAX_IDS_PER_MICRO) { - nextIDNumbers(n-MAX_IDS_PER_MICRO, ids); - nextIDNumbers(MAX_IDS_PER_MICRO, ids); + NextIDNumbers(n-MAX_IDS_PER_MICRO, ids); + NextIDNumbers(MAX_IDS_PER_MICRO, ids); return; } if (n <= 0) { @@ -41,12 +40,11 @@ void SimpleIDGenerator::nextIDNumbers(size_t n, IDNumbers& ids) { for (int pos=0; pos #include "Types.h" +#include +#include + namespace zilliz { namespace vecwise { namespace engine { class IDGenerator { public: - virtual IDNumber getNextIDNumber() = 0; - virtual void getNextIDNumbers(size_t n, IDNumbers& ids) = 0; + virtual IDNumber GetNextIDNumber() = 0; + virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) = 0; virtual ~IDGenerator(); @@ -24,11 +26,11 @@ public: class SimpleIDGenerator : public IDGenerator { public: - virtual IDNumber getNextIDNumber() override; - virtual void getNextIDNumbers(size_t n, IDNumbers& ids) override; + virtual IDNumber GetNextIDNumber() override; + virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) override; private: - void nextIDNumbers(size_t n, IDNumbers& ids); + void NextIDNumbers(size_t n, IDNumbers& ids); const size_t MAX_IDS_PER_MICRO = 1000; }; // SimpleIDGenerator diff --git a/cpp/src/db/LocalMetaImpl.cpp b/cpp/src/db/LocalMetaImpl.cpp deleted file mode 100644 index aa852a3db6..0000000000 --- a/cpp/src/db/LocalMetaImpl.cpp +++ /dev/null @@ -1,277 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved -// Unauthorized copying of this file, via any medium is strictly prohibited. -// Proprietary and confidential. -//////////////////////////////////////////////////////////////////////////////// -#include -#include -#include -#include -#include -#include -#include - -#include -#include "LocalMetaImpl.h" -#include "IDGenerator.h" - -namespace zilliz { -namespace vecwise { -namespace engine { -namespace meta { - -long LocalMetaImpl::GetFileSize(const std::string& filename) -{ - struct stat stat_buf; - int rc = stat(filename.c_str(), &stat_buf); - return rc == 0 ? stat_buf.st_size : -1; -} - -std::string LocalMetaImpl::GetGroupPath(const std::string& group_id) { - return _options.path + "/" + group_id; -} - -std::string LocalMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) { - std::stringstream ss; - ss << GetGroupPath(group_id) << "/" << date; - return ss.str(); -} - -std::string LocalMetaImpl::GetNextGroupFileLocationByPartition(const std::string& group_id, DateT& date, - GroupFileSchema::FILE_TYPE file_type) { - std::string suffix = (file_type == GroupFileSchema::RAW) ? ".raw" : ".index"; - SimpleIDGenerator g; - std::stringstream ss; - ss << GetGroupPath(group_id) << "/" << date << "/" << g.getNextIDNumber() << suffix; - return ss.str(); -} - -std::string LocalMetaImpl::GetGroupMetaPathByGroupPath(const std::string& group_path) { - return group_path + "/" + "meta"; -} - -std::string LocalMetaImpl::GetGroupMetaPath(const std::string& group_id) { - return GetGroupMetaPathByGroupPath(GetGroupPath(group_id)); -} - -Status LocalMetaImpl::GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info) { - boost::property_tree::ptree ptree; - boost::property_tree::read_json(path, ptree); - auto files_cnt = ptree.get_child("files_cnt").data(); - auto dimension = ptree.get_child("dimension").data(); - /* std::cout << dimension << std::endl; */ - /* std::cout << files_cnt << std::endl; */ - - group_info.id = std::stoi(group_info.group_id); - group_info.files_cnt = std::stoi(files_cnt); - group_info.dimension = std::stoi(dimension); - group_info.location = GetGroupPath(group_info.group_id); - - return Status::OK(); - -} - -Status LocalMetaImpl::GetGroupMetaInfo(const std::string& group_id, GroupSchema& group_info) { - group_info.group_id = group_id; - return GetGroupMetaInfoByPath(GetGroupMetaPath(group_id), group_info); -} - -LocalMetaImpl::LocalMetaImpl(const DBMetaOptions& options_) - : _options(options_) { - initialize(); -} - -Status LocalMetaImpl::initialize() { - if (boost::filesystem::is_directory(_options.path)) { - } - else if (!boost::filesystem::create_directory(_options.path)) { - return Status::InvalidDBPath("Cannot Create " + _options.path); - } - return Status::OK(); -} - -Status LocalMetaImpl::add_group(GroupSchema& group_info) { - std::string real_gid; - size_t id = SimpleIDGenerator().getNextIDNumber(); - if (group_info.group_id == "") { - std::stringstream ss; - ss << id; - real_gid = ss.str(); - } else { - real_gid = group_info.group_id; - } - - bool group_exist; - has_group(real_gid, group_exist); - if (group_exist) { - return Status::GroupError("Group Already Existed " + real_gid); - } - if (!boost::filesystem::create_directory(GetGroupPath(real_gid))) { - return Status::GroupError("Cannot Create Group " + real_gid); - } - - group_info.group_id = real_gid; - group_info.files_cnt = 0; - group_info.id = 0; - group_info.location = GetGroupPath(real_gid); - - boost::property_tree::ptree out; - out.put("files_cnt", group_info.files_cnt); - out.put("dimension", group_info.dimension); - boost::property_tree::write_json(GetGroupMetaPath(real_gid), out); - - return Status::OK(); -} - -Status LocalMetaImpl::get_group(GroupSchema& group_info) { - bool group_exist; - has_group(group_info.group_id, group_exist); - if (!group_exist) { - return Status::NotFound("Group " + group_info.group_id + " Not Found"); - } - - return GetGroupMetaInfo(group_info.group_id, group_info); -} - -Status LocalMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { - has_or_not = boost::filesystem::is_directory(GetGroupPath(group_id)); - return Status::OK(); -} - -Status LocalMetaImpl::add_group_file(GroupFileSchema& group_file_info) { - GroupSchema group_info; - /* auto status = get_group(group_info); */ - /* if (!status.ok()) { */ - /* return status; */ - /* } */ - /* auto location = GetNextGroupFileLocationByPartition(group_id, date, file_type); */ - /* group_file_info.group_id = group_id; */ - /* group_file_info.dimension = group_info.dimension; */ - /* group_file_info.location = location; */ - /* group_file_info.date = date; */ - return Status::OK(); -} - -Status LocalMetaImpl::files_to_index(GroupFilesSchema& files) { - files.clear(); - - std::string suffix; - boost::filesystem::directory_iterator end_itr; - for (boost::filesystem::directory_iterator itr(_options.path); itr != end_itr; ++itr) { - auto group_path = itr->path().string(); - GroupSchema group_info; - GetGroupMetaInfoByPath(GetGroupMetaPathByGroupPath(group_path), group_info); - for (boost::filesystem::directory_iterator innerItr(group_path); innerItr != end_itr; ++innerItr) { - auto partition_path = innerItr->path().string(); - for (boost::filesystem::directory_iterator fItr(partition_path); fItr != end_itr; ++fItr) { - auto location = fItr->path().string(); - suffix = location.substr(location.find_last_of('.') + 1); - if (suffix == "index") continue; - if (INDEX_TRIGGER_SIZE >= GetFileSize(location)) continue; - std::cout << "[About to index] " << location << std::endl; - GroupFileSchema f; - f.location = location; - /* f.group_id = group_id; */ - f.dimension = group_info.dimension; - files.push_back(f); - } - } - } - - return Status::OK(); -} - -Status LocalMetaImpl::files_to_merge(const std::string& group_id, - DatePartionedGroupFilesSchema& files) { - files.clear(); - /* std::string suffix; */ - /* boost::filesystem::directory_iterator end_itr; */ - /* for (boost::filesystem::directory_iterator itr(_options.path); itr != end_itr; ++itr) { */ - /* auto group_path = itr->path().string(); */ - /* GroupSchema group_info; */ - /* GetGroupMetaInfoByPath(GetGroupMetaPathByGroupPath(group_path), group_info); */ - /* for (boost::filesystem::directory_iterator innerItr(group_path); innerItr != end_itr; ++innerItr) { */ - /* auto partition_path = innerItr->path().string(); */ - /* for (boost::filesystem::directory_iterator fItr(partition_path); fItr != end_itr; ++fItr) { */ - /* auto location = fItr->path().string(); */ - /* suffix = location.substr(location.find_last_of('.') + 1); */ - /* if (suffix == "index") continue; */ - /* if (INDEX_TRIGGER_SIZE < GetFileSize(location)) continue; */ - /* std::cout << "[About to index] " << location << std::endl; */ - /* GroupFileSchema f; */ - /* f.location = location; */ - /* f.group_id = group_id; */ - /* f.dimension = group_info.dimension; */ - /* files.push_back(f); */ - /* } */ - /* } */ - /* } */ - - return Status::OK(); -} - -Status LocalMetaImpl::has_group_file(const std::string& group_id_, - const std::string& file_id_, - bool& has_or_not_) { - //PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::get_group_file(const std::string& group_id_, - const std::string& file_id_, - GroupFileSchema& group_file_info_) { - //PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::get_group_files(const std::string& group_id_, - const int date_delta_, - GroupFilesSchema& group_files_info_) { - // PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::update_group_file(GroupFileSchema& group_file_) { - //PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::update_files(GroupFilesSchema& files) { - //PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::archive_files() { - //PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::cleanup() { - //PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::cleanup_ttl_files(uint16_t seconds) { - // PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::drop_all() { - // PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::size(long& result) { - // PXU TODO - return Status::OK(); -} - -Status LocalMetaImpl::count(const std::string& group_id, long& result) { - // PXU TODO - return Status::OK(); -} - -} // namespace meta -} // namespace engine -} // namespace vecwise -} // namespace zilliz diff --git a/cpp/src/db/LocalMetaImpl.h b/cpp/src/db/LocalMetaImpl.h deleted file mode 100644 index fb989d5f67..0000000000 --- a/cpp/src/db/LocalMetaImpl.h +++ /dev/null @@ -1,83 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved -// Unauthorized copying of this file, via any medium is strictly prohibited. -// Proprietary and confidential. -//////////////////////////////////////////////////////////////////////////////// -#pragma once - -#include "Meta.h" -#include "Options.h" - -namespace zilliz { -namespace vecwise { -namespace engine { -namespace meta { - -class LocalMetaImpl : public Meta { -public: - const size_t INDEX_TRIGGER_SIZE = 1024*1024*500; - LocalMetaImpl(const DBMetaOptions& options_); - - virtual Status add_group(GroupSchema& group_info_) override; - virtual Status get_group(GroupSchema& group_info_) override; - virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; - - virtual Status add_group_file(GroupFileSchema& group_file_info) override; - /* virtual Status delete_group_partitions(const std::string& group_id, */ - /* const meta::DatesT& dates) override; */ - - virtual Status has_group_file(const std::string& group_id_, - const std::string& file_id_, - bool& has_or_not_) override; - virtual Status get_group_file(const std::string& group_id_, - const std::string& file_id_, - GroupFileSchema& group_file_info_) override; - virtual Status update_group_file(GroupFileSchema& group_file_) override; - - virtual Status get_group_files(const std::string& group_id_, - const int date_delta_, - GroupFilesSchema& group_files_info_) override; - - virtual Status update_files(GroupFilesSchema& files) override; - - virtual Status cleanup() override; - - virtual Status files_to_merge(const std::string& group_id, - DatePartionedGroupFilesSchema& files) override; - - virtual Status files_to_index(GroupFilesSchema&) override; - - virtual Status archive_files() override; - - virtual Status cleanup_ttl_files(uint16_t seconds) override; - - virtual Status count(const std::string& group_id, long& result) override; - - virtual Status drop_all() override; - - virtual Status size(long& result) override; - -private: - - Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info); - std::string GetGroupMetaPathByGroupPath(const std::string& group_path); - Status GetGroupMetaInfo(const std::string& group_id, GroupSchema& group_info); - std::string GetNextGroupFileLocationByPartition(const std::string& group_id, DateT& date, - GroupFileSchema::FILE_TYPE file_type); - std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date); - std::string GetGroupPath(const std::string& group_id); - std::string GetGroupMetaPath(const std::string& group_id); - - Status CreateGroupMeta(const GroupSchema& group_schema); - long GetFileSize(const std::string& filename); - - Status initialize(); - - const DBMetaOptions _options; - -}; // LocalMetaImpl - -} // namespace meta -} // namespace engine -} // namespace vecwise -} // namespace zilliz diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index 9b526dc1d9..35272f4211 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -5,15 +5,15 @@ ******************************************************************************/ #pragma once +#include "IDGenerator.h" +#include "Status.h" +#include "Meta.h" + #include #include #include #include #include -#include "IDGenerator.h" -#include "Status.h" -#include "Meta.h" - namespace zilliz { namespace vecwise { @@ -26,24 +26,24 @@ namespace meta { template class MemVectors { public: - typedef typename EngineT::Ptr EnginePtr; - typedef typename meta::Meta::Ptr MetaPtr; - typedef std::shared_ptr> Ptr; + using EnginePtr = typename EngineT::Ptr; + using MetaPtr = meta::Meta::Ptr; + using Ptr = std::shared_ptr>; explicit MemVectors(const std::shared_ptr&, - const meta::GroupFileSchema&, const Options&); + const meta::TableFileSchema&, const Options&); - void add(size_t n_, const float* vectors_, IDNumbers& vector_ids_); + void Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_); - size_t total() const; + size_t Total() const; - size_t approximate_size() const; + size_t ApproximateSize() const; - Status serialize(std::string& group_id); + Status Serialize(std::string& table_id); ~MemVectors(); - const std::string& location() const { return schema_.location; } + const std::string& Location() const { return schema_.location; } private: MemVectors() = delete; @@ -52,8 +52,8 @@ private: MetaPtr pMeta_; Options options_; - meta::GroupFileSchema schema_; - IDGenerator* _pIdGenerator; + meta::TableFileSchema schema_; + IDGenerator* pIdGenerator_; EnginePtr pEE_; }; // MemVectors @@ -63,32 +63,32 @@ private: template class MemManager { public: - typedef typename meta::Meta::Ptr MetaPtr; - typedef typename MemVectors::Ptr MemVectorsPtr; - typedef std::shared_ptr> Ptr; + using MetaPtr = meta::Meta::Ptr; + using MemVectorsPtr = typename MemVectors::Ptr; + using Ptr = std::shared_ptr>; - MemManager(const std::shared_ptr& meta_, const Options& options) - : _pMeta(meta_), options_(options) {} + MemManager(const std::shared_ptr& meta, const Options& options) + : pMeta_(meta), options_(options) {} - MemVectorsPtr get_mem_by_group(const std::string& group_id_); + MemVectorsPtr GetMemByTable(const std::string& table_id); - Status add_vectors(const std::string& group_id_, - size_t n_, const float* vectors_, IDNumbers& vector_ids_); + Status InsertVectors(const std::string& table_id, + size_t n, const float* vectors, IDNumbers& vector_ids); - Status serialize(std::vector& group_ids); + Status Serialize(std::vector& table_ids); private: - Status add_vectors_no_lock(const std::string& group_id_, - size_t n_, const float* vectors_, IDNumbers& vector_ids_); - Status mark_memory_as_immutable(); + Status InsertVectorsNoLock(const std::string& table_id, + size_t n, const float* vectors, IDNumbers& vector_ids); + Status ToImmutable(); - typedef std::map MemMap; - typedef std::vector ImmMemPool; - MemMap _memMap; - ImmMemPool _immMems; - MetaPtr _pMeta; + using MemMap = std::map; + using ImmMemPool = std::vector; + MemMap memMap_; + ImmMemPool immMems_; + MetaPtr pMeta_; Options options_; - std::mutex _mutex; + std::mutex mutex_; std::mutex serialization_mtx_; }; // MemManager @@ -96,4 +96,4 @@ private: } // namespace engine } // namespace vecwise } // namespace zilliz -#include "MemManager.cpp" +#include "MemManager.inl" diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.inl similarity index 51% rename from cpp/src/db/MemManager.cpp rename to cpp/src/db/MemManager.inl index 601146ba66..35e7c70ada 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.inl @@ -3,18 +3,16 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#ifndef MEMMANGE_CPP__ -#define MEMMANGE_CPP__ - -#include -#include -#include -#include +#pragma once #include "MemManager.h" #include "Meta.h" #include "MetaConsts.h" +#include +#include +#include +#include namespace zilliz { namespace vecwise { @@ -22,42 +20,42 @@ namespace engine { template MemVectors::MemVectors(const std::shared_ptr& meta_ptr, - const meta::GroupFileSchema& schema, const Options& options) + const meta::TableFileSchema& schema, const Options& options) : pMeta_(meta_ptr), options_(options), schema_(schema), - _pIdGenerator(new SimpleIDGenerator()), + pIdGenerator_(new SimpleIDGenerator()), pEE_(new EngineT(schema_.dimension, schema_.location)) { } template -void MemVectors::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) { - _pIdGenerator->getNextIDNumbers(n_, vector_ids_); +void MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) { + pIdGenerator_->GetNextIDNumbers(n_, vector_ids_); pEE_->AddWithIds(n_, vectors_, vector_ids_.data()); } template -size_t MemVectors::total() const { +size_t MemVectors::Total() const { return pEE_->Count(); } template -size_t MemVectors::approximate_size() const { +size_t MemVectors::ApproximateSize() const { return pEE_->Size(); } template -Status MemVectors::serialize(std::string& group_id) { - group_id = schema_.group_id; - auto size = approximate_size(); +Status MemVectors::Serialize(std::string& table_id) { + table_id = schema_.table_id; + auto size = ApproximateSize(); pEE_->Serialize(); schema_.size = size; schema_.file_type = (size >= options_.index_trigger_size) ? - meta::GroupFileSchema::TO_INDEX : meta::GroupFileSchema::RAW; + meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; - auto status = pMeta_->update_group_file(schema_); + auto status = pMeta_->UpdateTableFile(schema_); - LOG(DEBUG) << "New " << ((schema_.file_type == meta::GroupFileSchema::RAW) ? "raw" : "to_index") + LOG(DEBUG) << "New " << ((schema_.file_type == meta::TableFileSchema::RAW) ? "raw" : "to_index") << " file " << schema_.file_id << " of size " << pEE_->Size() / meta::M << " M"; pEE_->Cache(); @@ -67,9 +65,9 @@ Status MemVectors::serialize(std::string& group_id) { template MemVectors::~MemVectors() { - if (_pIdGenerator != nullptr) { - delete _pIdGenerator; - _pIdGenerator = nullptr; + if (pIdGenerator_ != nullptr) { + delete pIdGenerator_; + pIdGenerator_ = nullptr; } } @@ -78,69 +76,69 @@ MemVectors::~MemVectors() { */ template -typename MemManager::MemVectorsPtr MemManager::get_mem_by_group( - const std::string& group_id) { - auto memIt = _memMap.find(group_id); - if (memIt != _memMap.end()) { +typename MemManager::MemVectorsPtr MemManager::GetMemByTable( + const std::string& table_id) { + auto memIt = memMap_.find(table_id); + if (memIt != memMap_.end()) { return memIt->second; } - meta::GroupFileSchema group_file; - group_file.group_id = group_id; - auto status = _pMeta->add_group_file(group_file); + meta::TableFileSchema table_file; + table_file.table_id = table_id; + auto status = pMeta_->CreateTableFile(table_file); if (!status.ok()) { return nullptr; } - _memMap[group_id] = MemVectorsPtr(new MemVectors(_pMeta, group_file, options_)); - return _memMap[group_id]; + memMap_[table_id] = MemVectorsPtr(new MemVectors(pMeta_, table_file, options_)); + return memMap_[table_id]; } template -Status MemManager::add_vectors(const std::string& group_id_, +Status MemManager::InsertVectors(const std::string& table_id_, size_t n_, const float* vectors_, IDNumbers& vector_ids_) { - std::unique_lock lock(_mutex); - return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_); + std::unique_lock lock(mutex_); + return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_); } template -Status MemManager::add_vectors_no_lock(const std::string& group_id, +Status MemManager::InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors, IDNumbers& vector_ids) { - MemVectorsPtr mem = get_mem_by_group(group_id); + MemVectorsPtr mem = GetMemByTable(table_id); if (mem == nullptr) { - return Status::NotFound("Group " + group_id + " not found!"); + return Status::NotFound("Group " + table_id + " not found!"); } - mem->add(n, vectors, vector_ids); + mem->Add(n, vectors, vector_ids); return Status::OK(); } template -Status MemManager::mark_memory_as_immutable() { - std::unique_lock lock(_mutex); - for (auto& kv: _memMap) { - _immMems.push_back(kv.second); +Status MemManager::ToImmutable() { + std::unique_lock lock(mutex_); + for (auto& kv: memMap_) { + immMems_.push_back(kv.second); } - _memMap.clear(); + memMap_.clear(); return Status::OK(); } template -Status MemManager::serialize(std::vector& group_ids) { - mark_memory_as_immutable(); +Status MemManager::Serialize(std::vector& table_ids) { + ToImmutable(); std::unique_lock lock(serialization_mtx_); - std::string group_id; - group_ids.clear(); - for (auto& mem : _immMems) { - mem->serialize(group_id); - group_ids.push_back(group_id); + std::string table_id; + table_ids.clear(); + for (auto& mem : immMems_) { + mem->Serialize(table_id); + table_ids.push_back(table_id); } - _immMems.clear(); + immMems_.clear(); return Status::OK(); } @@ -148,5 +146,3 @@ Status MemManager::serialize(std::vector& group_ids) { } // namespace engine } // namespace vecwise } // namespace zilliz - -#endif diff --git a/cpp/src/db/Meta.cpp b/cpp/src/db/Meta.cpp index 3d272e0637..f0539ae1be 100644 --- a/cpp/src/db/Meta.cpp +++ b/cpp/src/db/Meta.cpp @@ -3,9 +3,10 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#include "Meta.h" + #include #include -#include "Meta.h" namespace zilliz { namespace vecwise { diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index a4bbb23380..b67191739f 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -4,14 +4,15 @@ * Proprietary and confidential. ******************************************************************************/ #pragma once -#include -#include -#include #include "MetaTypes.h" #include "Options.h" #include "Status.h" +#include +#include +#include + namespace zilliz { namespace vecwise { namespace engine { @@ -20,49 +21,40 @@ namespace meta { class Meta { public: - typedef std::shared_ptr Ptr; + using Ptr = std::shared_ptr; - virtual Status add_group(GroupSchema& group_info) = 0; - virtual Status get_group(GroupSchema& group_info) = 0; - virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; + virtual Status CreateTable(TableSchema& table_schema) = 0; + virtual Status DescribeTable(TableSchema& table_schema) = 0; + virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0; - virtual Status add_group_file(GroupFileSchema& group_file_info) = 0; - virtual Status delete_group_partitions(const std::string& group_id, - const meta::DatesT& dates) = 0; + virtual Status CreateTableFile(TableFileSchema& file_schema) = 0; + virtual Status DropPartitionsByDates(const std::string& table_id, + const DatesT& dates) = 0; - virtual Status has_group_file(const std::string& group_id_, - const std::string& file_id_, - bool& has_or_not_) = 0; - virtual Status get_group_file(const std::string& group_id_, - const std::string& file_id_, - GroupFileSchema& group_file_info_) = 0; - virtual Status update_group_file(GroupFileSchema& group_file_) = 0; + virtual Status GetTableFile(TableFileSchema& file_schema) = 0; + virtual Status UpdateTableFile(TableFileSchema& file_schema) = 0; - virtual Status get_group_files(const std::string& group_id_, - const int date_delta_, - GroupFilesSchema& group_files_info_) = 0; + virtual Status UpdateTableFiles(TableFilesSchema& files) = 0; - virtual Status update_files(GroupFilesSchema& files) = 0; - - virtual Status files_to_search(const std::string& group_id, + virtual Status FilesToSearch(const std::string& table_id, const DatesT& partition, - DatePartionedGroupFilesSchema& files) = 0; + DatePartionedTableFilesSchema& files) = 0; - virtual Status files_to_merge(const std::string& group_id, - DatePartionedGroupFilesSchema& files) = 0; + virtual Status FilesToMerge(const std::string& table_id, + DatePartionedTableFilesSchema& files) = 0; - virtual Status size(long& result) = 0; + virtual Status Size(long& result) = 0; - virtual Status archive_files() = 0; + virtual Status Archive() = 0; - virtual Status files_to_index(GroupFilesSchema&) = 0; + virtual Status FilesToIndex(TableFilesSchema&) = 0; - virtual Status cleanup() = 0; - virtual Status cleanup_ttl_files(uint16_t) = 0; + virtual Status CleanUp() = 0; + virtual Status CleanUpFilesWithTTL(uint16_t) = 0; - virtual Status drop_all() = 0; + virtual Status DropAll() = 0; - virtual Status count(const std::string& group_id, long& result) = 0; + virtual Status Count(const std::string& table_id, long& result) = 0; static DateT GetDate(const std::time_t& t, int day_delta = 0); static DateT GetDate(); diff --git a/cpp/src/db/MetaTypes.h b/cpp/src/db/MetaTypes.h index 4b956590ae..b2fe783323 100644 --- a/cpp/src/db/MetaTypes.h +++ b/cpp/src/db/MetaTypes.h @@ -18,16 +18,16 @@ typedef int DateT; const DateT EmptyDate = -1; typedef std::vector DatesT; -struct GroupSchema { +struct TableSchema { size_t id; - std::string group_id; + std::string table_id; size_t files_cnt = 0; uint16_t dimension; - std::string location = ""; + std::string location; long created_on; -}; // GroupSchema +}; // TableSchema -struct GroupFileSchema { +struct TableFileSchema { typedef enum { NEW, RAW, @@ -37,19 +37,19 @@ struct GroupFileSchema { } FILE_TYPE; size_t id; - std::string group_id; + std::string table_id; std::string file_id; int file_type = NEW; size_t size; DateT date = EmptyDate; uint16_t dimension; - std::string location = ""; + std::string location; long updated_time; long created_on; -}; // GroupFileSchema +}; // TableFileSchema -typedef std::vector GroupFilesSchema; -typedef std::map DatePartionedGroupFilesSchema; +typedef std::vector TableFilesSchema; +typedef std::map DatePartionedTableFilesSchema; } // namespace meta } // namespace engine diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 100ab7b3ab..cd6f1f98b8 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -48,12 +48,6 @@ struct Options { }; // Options -struct GroupOptions { - size_t dimension; - bool has_id = false; -}; // GroupOptions - - } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/Utils.cpp b/cpp/src/db/Utils.cpp index e459bab4bb..9b40152587 100644 --- a/cpp/src/db/Utils.cpp +++ b/cpp/src/db/Utils.cpp @@ -3,9 +3,9 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#include "Utils.h" #include -#include "Utils.h" namespace zilliz { namespace vecwise { diff --git a/cpp/src/db/db_connection.cpp b/cpp/src/db/db_connection.cpp deleted file mode 100644 index 74779daf18..0000000000 --- a/cpp/src/db/db_connection.cpp +++ /dev/null @@ -1,31 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved -// Unauthorized copying of this file, via any medium is strictly prohibited. -// Proprietary and confidential. -//////////////////////////////////////////////////////////////////////////////// - -#include "db_connection.h" - - -namespace zilliz { -namespace vecwise { -namespace engine { - -using std::string; -using namespace sqlite_orm; - -string storage_file_name = "default.sqlite"; - -SqliteDBPtr connect() { - SqliteDBPtr temp = std::make_shared(initStorage(storage_file_name)); - temp->sync_schema(); - temp->open_forever(); // thread safe option - //temp->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log - return temp; -} - -/* SqliteDBPtr Connection::connect_ = connect(); */ - -} -} -} diff --git a/cpp/src/db/db_connection.h b/cpp/src/db/db_connection.h deleted file mode 100644 index cfd34461af..0000000000 --- a/cpp/src/db/db_connection.h +++ /dev/null @@ -1,64 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved -// Unauthorized copying of this file, via any medium is strictly prohibited. -// Proprietary and confidential. -//////////////////////////////////////////////////////////////////////////////// - -#pragma once - -#include -#include - -#include - - -namespace zilliz { -namespace vecwise { -namespace engine { - -struct GroupSchema { - size_t id; - std::string group_id; - size_t files_cnt = 0; - uint16_t dimension; - std::string location = ""; - std::string next_file_location = ""; -}; // GroupSchema - - -struct GroupFileSchema { - typedef enum { - RAW, - INDEX - } FILE_TYPE; - - size_t id; - std::string group_id; - std::string file_id; - int files_type = RAW; - size_t rows; - std::string location = ""; -}; // GroupFileSchema - -inline auto initStorage(const std::string &path) { - using namespace sqlite_orm; - return make_storage(path, - // Add table below - make_table("Groups", - make_column("id", &GroupSchema::id, primary_key()), - make_column("group_id", &GroupSchema::group_id, unique()), - make_column("dimension", &GroupSchema::dimension), - make_column("files_cnt", &GroupSchema::files_cnt, default_value(0)))); -} - -using SqliteDB = decltype(initStorage("")); -using SqliteDBPtr= std::shared_ptr; - -class Connection { - protected: - static SqliteDBPtr connect_; -}; - -} -} -} diff --git a/cpp/src/server/MegasearchTask.cpp b/cpp/src/server/MegasearchTask.cpp index 884557ffa4..6f01b38ce1 100644 --- a/cpp/src/server/MegasearchTask.cpp +++ b/cpp/src/server/MegasearchTask.cpp @@ -78,17 +78,17 @@ BaseTaskPtr CreateTableTask::Create(const thrift::TableSchema& schema) { ServerError CreateTableTask::OnExecute() { TimeRecorder rc("CreateTableTask"); - + try { if(schema_.vector_column_array.empty()) { return SERVER_INVALID_ARGUMENT; } IVecIdMapper::GetInstance()->AddGroup(schema_.table_name); - engine::meta::GroupSchema group_info; - group_info.dimension = (uint16_t)schema_.vector_column_array[0].dimension; - group_info.group_id = schema_.table_name; - engine::Status stat = DB()->add_group(group_info); + engine::meta::TableSchema table_schema; + table_schema.dimension = (uint16_t)schema_.vector_column_array[0].dimension; + table_schema.table_id = schema_.table_name; + engine::Status stat = DB()->CreateTable(table_schema); if(!stat.ok()) {//could exist error_msg_ = "Engine failed: " + stat.ToString(); SERVER_LOG_ERROR << error_msg_; @@ -123,9 +123,9 @@ ServerError DescribeTableTask::OnExecute() { TimeRecorder rc("DescribeTableTask"); try { - engine::meta::GroupSchema group_info; - group_info.group_id = table_name_; - engine::Status stat = DB()->get_group(group_info); + engine::meta::TableSchema table_schema; + table_schema.table_id = table_name_; + engine::Status stat = DB()->DescribeTable(table_schema); if(!stat.ok()) { error_code_ = SERVER_GROUP_NOT_EXIST; error_msg_ = "Engine failed: " + stat.ToString(); @@ -154,8 +154,8 @@ DeleteTableTask::DeleteTableTask(const std::string& table_name) } -BaseTaskPtr DeleteTableTask::Create(const std::string& group_id) { - return std::shared_ptr(new DeleteTableTask(group_id)); +BaseTaskPtr DeleteTableTask::Create(const std::string& table_id) { + return std::shared_ptr(new DeleteTableTask(table_id)); } ServerError DeleteTableTask::OnExecute() { @@ -195,9 +195,9 @@ ServerError AddVectorTask::OnExecute() { return SERVER_SUCCESS; } - engine::meta::GroupSchema group_info; - group_info.group_id = table_name_; - engine::Status stat = DB()->get_group(group_info); + engine::meta::TableSchema table_schema; + table_schema.table_id = table_name_; + engine::Status stat = DB()->DescribeTable(table_schema); if(!stat.ok()) { error_code_ = SERVER_GROUP_NOT_EXIST; error_msg_ = "Engine failed: " + stat.ToString(); @@ -208,7 +208,7 @@ ServerError AddVectorTask::OnExecute() { rc.Record("get group info"); uint64_t vec_count = (uint64_t)record_array_.size(); - uint64_t group_dim = group_info.dimension; + uint64_t group_dim = table_schema.dimension; std::vector vec_f; vec_f.resize(vec_count*group_dim);//allocate enough memory for(uint64_t i = 0; i < vec_count; i++) { @@ -236,7 +236,7 @@ ServerError AddVectorTask::OnExecute() { rc.Record("prepare vectors data"); - stat = DB()->add_vectors(table_name_, vec_count, vec_f.data(), record_ids_); + stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_); rc.Record("add vectors to engine"); if(!stat.ok()) { error_code_ = SERVER_UNEXPECTED_ERROR; @@ -293,9 +293,9 @@ ServerError SearchVectorTask::OnExecute() { return error_code_; } - engine::meta::GroupSchema group_info; - group_info.group_id = table_name_; - engine::Status stat = DB()->get_group(group_info); + engine::meta::TableSchema table_schema; + table_schema.table_id = table_name_; + engine::Status stat = DB()->DescribeTable(table_schema); if(!stat.ok()) { error_code_ = SERVER_GROUP_NOT_EXIST; error_msg_ = "Engine failed: " + stat.ToString(); @@ -305,7 +305,7 @@ ServerError SearchVectorTask::OnExecute() { std::vector vec_f; uint64_t record_count = (uint64_t)record_array_.size(); - vec_f.resize(record_count*group_info.dimension); + vec_f.resize(record_count*table_schema.dimension); for(uint64_t i = 0; i < record_array_.size(); i++) { const auto& record = record_array_[i]; @@ -317,9 +317,9 @@ ServerError SearchVectorTask::OnExecute() { } uint64_t vec_dim = record.vector_map.begin()->second.size() / sizeof(double);//how many double value? - if (vec_dim != group_info.dimension) { + if (vec_dim != table_schema.dimension) { SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim - << " vs. group dimension:" << group_info.dimension; + << " vs. group dimension:" << table_schema.dimension; error_code_ = SERVER_INVALID_VECTOR_DIMENSION; error_msg_ = "Engine failed: " + stat.ToString(); return error_code_; @@ -335,7 +335,7 @@ ServerError SearchVectorTask::OnExecute() { std::vector dates; engine::QueryResults results; - stat = DB()->search(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results); + stat = DB()->Query(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results); if(!stat.ok()) { SERVER_LOG_ERROR << "Engine failed: " << stat.ToString(); return SERVER_UNEXPECTED_ERROR; diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index f8ced228e0..4d2bfb3643 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -66,21 +66,21 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { static const int group_dim = 256; long size; - engine::meta::GroupSchema group_info; + engine::meta::TableSchema group_info; group_info.dimension = group_dim; - group_info.group_id = group_name; - engine::Status stat = db_->add_group(group_info); + group_info.table_id = group_name; + engine::Status stat = db_->CreateTable(group_info); - engine::meta::GroupSchema group_info_get; - group_info_get.group_id = group_name; - stat = db_->get_group(group_info_get); + engine::meta::TableSchema group_info_get; + group_info_get.table_id = group_name; + stat = db_->DescribeTable(group_info_get); ASSERT_STATS(stat); ASSERT_EQ(group_info_get.dimension, group_dim); engine::IDNumbers vector_ids; engine::IDNumbers target_ids; - db_->size(size); + db_->Size(size); int d = 256; int nb = 20; float *xb = new float[d * nb]; @@ -92,13 +92,13 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { int loop = 100000; for (auto i=0; iadd_vectors(group_name, nb, xb, vector_ids); + db_->InsertVectors(group_name, nb, xb, vector_ids); std::this_thread::sleep_for(std::chrono::microseconds(1)); } std::this_thread::sleep_for(std::chrono::seconds(1)); - db_->size(size); + db_->Size(size); LOG(DEBUG) << "size=" << size; ASSERT_TRUE(size < 1 * engine::meta::G); @@ -111,14 +111,14 @@ TEST_F(DBTest, DB_TEST) { static const std::string group_name = "test_group"; static const int group_dim = 256; - engine::meta::GroupSchema group_info; + engine::meta::TableSchema group_info; group_info.dimension = group_dim; - group_info.group_id = group_name; - engine::Status stat = db_->add_group(group_info); + group_info.table_id = group_name; + engine::Status stat = db_->CreateTable(group_info); - engine::meta::GroupSchema group_info_get; - group_info_get.group_id = group_name; - stat = db_->get_group(group_info_get); + engine::meta::TableSchema group_info_get; + group_info_get.table_id = group_name; + stat = db_->DescribeTable(group_info_get); ASSERT_STATS(stat); ASSERT_EQ(group_info_get.dimension, group_dim); @@ -152,12 +152,12 @@ TEST_F(DBTest, DB_TEST) { for (auto j=0; j<10; ++j) { ss.str(""); - db_->count(group_name, count); + db_->Size(count); prev_count = count; START_TIMER; - stat = db_->search(group_name, k, qb, qxb, results); - ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/engine::meta::M << " M"; + stat = db_->Query(group_name, k, qb, qxb, results); + ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; STOP_TIMER(ss.str()); ASSERT_STATS(stat); @@ -179,10 +179,10 @@ TEST_F(DBTest, DB_TEST) { for (auto i=0; iadd_vectors(group_name, qb, qxb, target_ids); + db_->InsertVectors(group_name, qb, qxb, target_ids); ASSERT_EQ(target_ids.size(), qb); } else { - db_->add_vectors(group_name, nb, xb, vector_ids); + db_->InsertVectors(group_name, nb, xb, vector_ids); } std::this_thread::sleep_for(std::chrono::microseconds(1)); } @@ -197,14 +197,14 @@ TEST_F(DBTest, SEARCH_TEST) { static const std::string group_name = "test_group"; static const int group_dim = 256; - engine::meta::GroupSchema group_info; + engine::meta::TableSchema group_info; group_info.dimension = group_dim; - group_info.group_id = group_name; - engine::Status stat = db_->add_group(group_info); + group_info.table_id = group_name; + engine::Status stat = db_->CreateTable(group_info); - engine::meta::GroupSchema group_info_get; - group_info_get.group_id = group_name; - stat = db_->get_group(group_info_get); + engine::meta::TableSchema group_info_get; + group_info_get.table_id = group_name; + stat = db_->DescribeTable(group_info_get); ASSERT_STATS(stat); ASSERT_EQ(group_info_get.dimension, group_dim); @@ -238,7 +238,7 @@ TEST_F(DBTest, SEARCH_TEST) { // insert data const int batch_size = 100; for (int j = 0; j < nb / batch_size; ++j) { - stat = db_->add_vectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids); + stat = db_->InsertVectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids); if (j == 200){ sleep(1);} ASSERT_STATS(stat); } @@ -246,7 +246,7 @@ TEST_F(DBTest, SEARCH_TEST) { sleep(2); // wait until build index finish engine::QueryResults results; - stat = db_->search(group_name, k, nq, xq.data(), results); + stat = db_->Query(group_name, k, nq, xq.data(), results); ASSERT_STATS(stat); // TODO(linxj): add groundTruth assert diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index 3832a3263d..abd03d3057 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -18,76 +18,76 @@ using namespace zilliz::vecwise::engine; TEST_F(MetaTest, GROUP_TEST) { - auto group_id = "meta_test_group"; + auto table_id = "meta_test_group"; - meta::GroupSchema group; - group.group_id = group_id; - auto status = impl_->add_group(group); + meta::TableSchema group; + group.table_id = table_id; + auto status = impl_->CreateTable(group); ASSERT_TRUE(status.ok()); auto gid = group.id; group.id = -1; - status = impl_->get_group(group); + status = impl_->DescribeTable(group); ASSERT_TRUE(status.ok()); ASSERT_EQ(group.id, gid); - ASSERT_EQ(group.group_id, group_id); + ASSERT_EQ(group.table_id, table_id); - group.group_id = "not_found"; - status = impl_->get_group(group); + group.table_id = "not_found"; + status = impl_->DescribeTable(group); ASSERT_TRUE(!status.ok()); - group.group_id = group_id; - status = impl_->add_group(group); + group.table_id = table_id; + status = impl_->CreateTable(group); ASSERT_TRUE(!status.ok()); } -TEST_F(MetaTest, GROUP_FILE_TEST) { - auto group_id = "meta_test_group"; +TEST_F(MetaTest, table_file_TEST) { + auto table_id = "meta_test_group"; - meta::GroupSchema group; - group.group_id = group_id; - auto status = impl_->add_group(group); + meta::TableSchema group; + group.table_id = table_id; + auto status = impl_->CreateTable(group); - meta::GroupFileSchema group_file; - group_file.group_id = group.group_id; - status = impl_->add_group_file(group_file); + meta::TableFileSchema table_file; + table_file.table_id = group.table_id; + status = impl_->CreateTableFile(table_file); ASSERT_TRUE(status.ok()); - ASSERT_EQ(group_file.file_type, meta::GroupFileSchema::NEW); + ASSERT_EQ(table_file.file_type, meta::TableFileSchema::NEW); - auto file_id = group_file.file_id; + auto file_id = table_file.file_id; - auto new_file_type = meta::GroupFileSchema::INDEX; - group_file.file_type = new_file_type; + auto new_file_type = meta::TableFileSchema::INDEX; + table_file.file_type = new_file_type; - status = impl_->update_group_file(group_file); + status = impl_->UpdateTableFile(table_file); ASSERT_TRUE(status.ok()); - ASSERT_EQ(group_file.file_type, new_file_type); + ASSERT_EQ(table_file.file_type, new_file_type); meta::DatesT dates; dates.push_back(meta::Meta::GetDate()); - status = impl_->delete_group_partitions(group_file.group_id, dates); + status = impl_->DropPartitionsByDates(table_file.table_id, dates); ASSERT_FALSE(status.ok()); dates.clear(); for (auto i=2; i < 10; ++i) { dates.push_back(meta::Meta::GetDateWithDelta(-1*i)); } - status = impl_->delete_group_partitions(group_file.group_id, dates); + status = impl_->DropPartitionsByDates(table_file.table_id, dates); ASSERT_TRUE(status.ok()); - group_file.date = meta::Meta::GetDateWithDelta(-2); - status = impl_->update_group_file(group_file); + table_file.date = meta::Meta::GetDateWithDelta(-2); + status = impl_->UpdateTableFile(table_file); ASSERT_TRUE(status.ok()); - ASSERT_EQ(group_file.date, meta::Meta::GetDateWithDelta(-2)); - ASSERT_FALSE(group_file.file_type == meta::GroupFileSchema::TO_DELETE); + ASSERT_EQ(table_file.date, meta::Meta::GetDateWithDelta(-2)); + ASSERT_FALSE(table_file.file_type == meta::TableFileSchema::TO_DELETE); dates.clear(); - dates.push_back(group_file.date); - status = impl_->delete_group_partitions(group_file.group_id, dates); + dates.push_back(table_file.date); + status = impl_->DropPartitionsByDates(table_file.table_id, dates); ASSERT_TRUE(status.ok()); - status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file); + status = impl_->GetTableFile(table_file); ASSERT_TRUE(status.ok()); - ASSERT_TRUE(group_file.file_type == meta::GroupFileSchema::TO_DELETE); + ASSERT_TRUE(table_file.file_type == meta::TableFileSchema::TO_DELETE); } TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { @@ -100,44 +100,44 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { options.archive_conf = ArchiveConf("delete", ss.str()); auto impl = meta::DBMetaImpl(options); - auto group_id = "meta_test_group"; + auto table_id = "meta_test_group"; - meta::GroupSchema group; - group.group_id = group_id; - auto status = impl.add_group(group); + meta::TableSchema group; + group.table_id = table_id; + auto status = impl.CreateTable(group); - meta::GroupFilesSchema files; - meta::GroupFileSchema group_file; - group_file.group_id = group.group_id; + meta::TableFilesSchema files; + meta::TableFileSchema table_file; + table_file.table_id = group.table_id; auto cnt = 100; long ts = utils::GetMicroSecTimeStamp(); std::vector days; for (auto i=0; iadd_group(group); + meta::TableSchema group; + group.table_id = table_id; + auto status = impl_->CreateTable(group); int new_files_cnt = 4; int raw_files_cnt = 5; int to_index_files_cnt = 6; int index_files_cnt = 7; - meta::GroupFileSchema group_file; - group_file.group_id = group.group_id; + meta::TableFileSchema table_file; + table_file.table_id = group.table_id; for (auto i=0; iadd_group_file(group_file); - group_file.file_type = meta::GroupFileSchema::NEW; - status = impl_->update_group_file(group_file); + status = impl_->CreateTableFile(table_file); + table_file.file_type = meta::TableFileSchema::NEW; + status = impl_->UpdateTableFile(table_file); } for (auto i=0; iadd_group_file(group_file); - group_file.file_type = meta::GroupFileSchema::RAW; - status = impl_->update_group_file(group_file); + status = impl_->CreateTableFile(table_file); + table_file.file_type = meta::TableFileSchema::RAW; + status = impl_->UpdateTableFile(table_file); } for (auto i=0; iadd_group_file(group_file); - group_file.file_type = meta::GroupFileSchema::TO_INDEX; - status = impl_->update_group_file(group_file); + status = impl_->CreateTableFile(table_file); + table_file.file_type = meta::TableFileSchema::TO_INDEX; + status = impl_->UpdateTableFile(table_file); } for (auto i=0; iadd_group_file(group_file); - group_file.file_type = meta::GroupFileSchema::INDEX; - status = impl_->update_group_file(group_file); + status = impl_->CreateTableFile(table_file); + table_file.file_type = meta::TableFileSchema::INDEX; + status = impl_->UpdateTableFile(table_file); } - meta::GroupFilesSchema files; + meta::TableFilesSchema files; - status = impl_->files_to_index(files); + status = impl_->FilesToIndex(files); ASSERT_TRUE(status.ok()); ASSERT_EQ(files.size(), to_index_files_cnt); - meta::DatePartionedGroupFilesSchema dated_files; - status = impl_->files_to_merge(group.group_id, dated_files); + meta::DatePartionedTableFilesSchema dated_files; + status = impl_->FilesToMerge(group.table_id, dated_files); ASSERT_TRUE(status.ok()); - ASSERT_EQ(dated_files[group_file.date].size(), raw_files_cnt); + ASSERT_EQ(dated_files[table_file.date].size(), raw_files_cnt); - status = impl_->files_to_index(files); + status = impl_->FilesToIndex(files); ASSERT_TRUE(status.ok()); ASSERT_EQ(files.size(), to_index_files_cnt); - meta::DatesT dates = {group_file.date}; - status = impl_->files_to_search(group_id, dates, dated_files); + meta::DatesT dates = {table_file.date}; + status = impl_->FilesToSearch(table_id, dates, dated_files); ASSERT_TRUE(status.ok()); - ASSERT_EQ(dated_files[group_file.date].size(), + ASSERT_EQ(dated_files[table_file.date].size(), to_index_files_cnt+raw_files_cnt+index_files_cnt); } diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 09428427fe..aa3ea560f3 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -59,5 +59,5 @@ void MetaTest::SetUp() { } void MetaTest::TearDown() { - impl_->drop_all(); + impl_->DropAll(); }