diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 2080fec550..263f4a787a 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,7 +10,9 @@ Please mark all change in change log and use the ticket from JIRA. ### New Feature +- MS-5 - Implement Auto Archive Feature + ### Task - MS-1 - Add CHANGELOG.md -- MS-4 - Refactor the vecwise_engine code structure \ No newline at end of file +- MS-4 - Refactor the vecwise_engine code structure diff --git a/cpp/src/db/DB.h b/cpp/src/db/DB.h index 450a980c4a..7d976ad824 100644 --- a/cpp/src/db/DB.h +++ b/cpp/src/db/DB.h @@ -23,6 +23,8 @@ public: virtual Status add_group(meta::GroupSchema& group_info_) = 0; virtual Status get_group(meta::GroupSchema& group_info_) = 0; + virtual Status delete_vectors(const std::string& group_id, + const meta::DatesT& dates) = 0; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, @@ -37,6 +39,8 @@ public: virtual Status search(const std::string& group_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0; + virtual Status size(long& result) = 0; + virtual Status drop_all() = 0; virtual Status count(const std::string& group_id, long& result) = 0; diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 216a9b352d..4028a1bbbe 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -44,6 +44,12 @@ Status DBImpl::get_group(meta::GroupSchema& group_info) { return _pMeta->get_group(group_info); } +template +Status DBImpl::delete_vectors(const std::string& group_id, + const meta::DatesT& dates) { + return _pMeta->delete_group_partitions(group_id, dates); +} + template Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) { return _pMeta->has_group(group_id_, has_or_not_); @@ -286,7 +292,7 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::Dat } else { group_file.file_type = meta::GroupFileSchema::RAW; } - group_file.rows = index_size; + group_file.size = index_size; updated.push_back(group_file); status = _pMeta->update_files(updated); LOG(DEBUG) << "New merged file " << group_file.file_id << @@ -320,6 +326,8 @@ Status DBImpl::background_merge_files(const std::string& group_id) { merge_files(group_id, kv.first, kv.second); } + _pMeta->archive_files(); + try_build_index(); _pMeta->cleanup_ttl_files(1); @@ -343,7 +351,7 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { auto index = to_index.BuildIndex(group_file.location); group_file.file_type = meta::GroupFileSchema::INDEX; - group_file.rows = index->Size(); + group_file.size = index->Size(); auto to_remove = file; to_remove.file_type = meta::GroupFileSchema::TO_DELETE; @@ -356,6 +364,7 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { << " from file " << to_remove.file_id; index->Cache(); + _pMeta->archive_files(); return Status::OK(); } @@ -416,6 +425,11 @@ Status DBImpl::count(const std::string& group_id, long& result) { return _pMeta->count(group_id, result); } +template +Status DBImpl::size(long& result) { + return _pMeta->size(result); +} + template DBImpl::~DBImpl() { { diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index d2aed0af1d..54c22eb48b 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -35,6 +35,7 @@ public: virtual Status add_group(meta::GroupSchema& group_info) override; virtual Status get_group(meta::GroupSchema& group_info) override; + virtual Status delete_vectors(const std::string& group_id, const meta::DatesT& dates) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status get_group_files(const std::string& group_id_, @@ -54,6 +55,8 @@ public: virtual Status count(const std::string& group_id, long& result) override; + virtual Status size(long& result) override; + virtual ~DBImpl(); private: diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index aaaaf21ce4..9b4e731f74 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -12,8 +12,11 @@ #include #include #include + #include "DBMetaImpl.h" #include "IDGenerator.h" +#include "Utils.h" +#include "MetaConsts.h" namespace zilliz { namespace vecwise { @@ -28,39 +31,26 @@ inline auto StoragePrototype(const std::string& path) { make_column("id", &GroupSchema::id, primary_key()), make_column("group_id", &GroupSchema::group_id, unique()), make_column("dimension", &GroupSchema::dimension), + make_column("created_on", &GroupSchema::created_on), make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))), make_table("GroupFile", make_column("id", &GroupFileSchema::id, primary_key()), make_column("group_id", &GroupFileSchema::group_id), make_column("file_id", &GroupFileSchema::file_id), make_column("file_type", &GroupFileSchema::file_type), - make_column("rows", &GroupFileSchema::rows, default_value(0)), + make_column("size", &GroupFileSchema::size, default_value(0)), make_column("updated_time", &GroupFileSchema::updated_time), + make_column("created_on", &GroupFileSchema::created_on), make_column("date", &GroupFileSchema::date)) ); } -using ConnectorT = decltype(StoragePrototype("/tmp/dummy.sqlite3")); +using ConnectorT = decltype(StoragePrototype("")); static std::unique_ptr ConnectorPtr; -long GetFileSize(const std::string& filename) -{ - struct stat stat_buf; - int rc = stat(filename.c_str(), &stat_buf); - return rc == 0 ? stat_buf.st_size : -1; -} - std::string DBMetaImpl::GetGroupPath(const std::string& group_id) { - return _options.path + "/" + group_id; -} - -long DBMetaImpl::GetMicroSecTimeStamp() { - auto now = std::chrono::system_clock::now(); - auto micros = std::chrono::duration_cast( - now.time_since_epoch()).count(); - - return micros; + return _options.path + "/tables/" + group_id; } std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) { @@ -79,6 +69,22 @@ void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) { group_file.location = ss.str(); } +Status DBMetaImpl::NextGroupId(std::string& group_id) { + std::stringstream ss; + SimpleIDGenerator g; + ss << g.getNextIDNumber(); + group_id = ss.str(); + return Status::OK(); +} + +Status DBMetaImpl::NextFileId(std::string& file_id) { + std::stringstream ss; + SimpleIDGenerator g; + ss << g.getNextIDNumber(); + file_id = ss.str(); + return Status::OK(); +} + DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_) : _options(options_) { initialize(); @@ -104,21 +110,56 @@ Status DBMetaImpl::initialize() { return Status::OK(); } +// PXU TODO: Temp solution. Will fix later +Status DBMetaImpl::delete_group_partitions(const std::string& group_id, + const meta::DatesT& dates) { + if (dates.size() == 0) { + return Status::OK(); + } + + GroupSchema group_info; + group_info.group_id = group_id; + auto status = get_group(group_info); + if (!status.ok()) { + return status; + } + + auto yesterday = GetDateWithDelta(-1); + + for (auto& date : dates) { + if (date >= yesterday) { + return Status::Error("Could not delete partitions with 2 days"); + } + } + + try { + ConnectorPtr->update_all( + set( + c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE + ), + where( + c(&GroupFileSchema::group_id) == group_id and + in(&GroupFileSchema::date, dates) + )); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; + } + return Status::OK(); +} + Status DBMetaImpl::add_group(GroupSchema& group_info) { if (group_info.group_id == "") { - std::stringstream ss; - SimpleIDGenerator g; - ss << g.getNextIDNumber(); - group_info.group_id = ss.str(); + NextGroupId(group_info.group_id); } group_info.files_cnt = 0; group_info.id = -1; + group_info.created_on = utils::GetMicroSecTimeStamp(); { try { auto id = ConnectorPtr->insert(group_info); group_info.id = id; - /* LOG(DEBUG) << "Add group " << id; */ } catch (...) { return Status::DBTransactionError("Add Group Error"); } @@ -127,7 +168,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) { auto group_path = GetGroupPath(group_info.group_id); if (!boost::filesystem::is_directory(group_path)) { - auto ret = boost::filesystem::create_directory(group_path); + auto ret = boost::filesystem::create_directories(group_path); if (!ret) { LOG(ERROR) << "Create directory " << group_path << " Error"; } @@ -192,21 +233,18 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { return status; } - SimpleIDGenerator g; - std::stringstream ss; - ss << g.getNextIDNumber(); + NextFileId(group_file.file_id); group_file.file_type = GroupFileSchema::NEW; - group_file.file_id = ss.str(); group_file.dimension = group_info.dimension; - group_file.rows = 0; - group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front(); + group_file.size = 0; + group_file.created_on = utils::GetMicroSecTimeStamp(); + group_file.updated_time = group_file.created_on; GetGroupFilePath(group_file); { try { auto id = ConnectorPtr->insert(group_file); group_file.id = id; - /* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */ } catch (...) { return Status::DBTransactionError("Add file Error"); } @@ -233,7 +271,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { &GroupFileSchema::group_id, &GroupFileSchema::file_id, &GroupFileSchema::file_type, - &GroupFileSchema::rows, + &GroupFileSchema::size, &GroupFileSchema::date), where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX)); @@ -245,7 +283,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { group_file.group_id = std::get<1>(file); group_file.file_id = std::get<2>(file); group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); + group_file.size = std::get<4>(file); group_file.date = std::get<5>(file); GetGroupFilePath(group_file); auto groupItr = groups.find(group_file.group_id); @@ -281,7 +319,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, &GroupFileSchema::group_id, &GroupFileSchema::file_id, &GroupFileSchema::file_type, - &GroupFileSchema::rows, + &GroupFileSchema::size, &GroupFileSchema::date), where(c(&GroupFileSchema::group_id) == group_id and in(&GroupFileSchema::date, dates) and @@ -302,7 +340,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, group_file.group_id = std::get<1>(file); group_file.file_id = std::get<2>(file); group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); + group_file.size = std::get<4>(file); group_file.date = std::get<5>(file); group_file.dimension = group_info.dimension; GetGroupFilePath(group_file); @@ -329,7 +367,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, &GroupFileSchema::group_id, &GroupFileSchema::file_id, &GroupFileSchema::file_type, - &GroupFileSchema::rows, + &GroupFileSchema::size, &GroupFileSchema::date), where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and c(&GroupFileSchema::group_id) == group_id)); @@ -347,7 +385,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, group_file.group_id = std::get<1>(file); group_file.file_id = std::get<2>(file); group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); + group_file.size = std::get<4>(file); group_file.date = std::get<5>(file); group_file.dimension = group_info.dimension; GetGroupFilePath(group_file); @@ -375,7 +413,32 @@ Status DBMetaImpl::has_group_file(const std::string& group_id_, Status DBMetaImpl::get_group_file(const std::string& group_id_, const std::string& file_id_, GroupFileSchema& group_file_info_) { - //PXU TODO + try { + auto files = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::size, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_id) == file_id_ and + c(&GroupFileSchema::group_id) == group_id_ + )); + assert(files.size() <= 1); + if (files.size() == 1) { + group_file_info_.id = std::get<0>(files[0]); + group_file_info_.group_id = std::get<1>(files[0]); + group_file_info_.file_id = std::get<2>(files[0]); + group_file_info_.file_type = std::get<3>(files[0]); + group_file_info_.size = std::get<4>(files[0]); + group_file_info_.date = std::get<5>(files[0]); + } else { + return Status::NotFound("GroupFile " + file_id_ + " not found"); + } + } catch (std::exception &e) { + LOG(DEBUG) << e.what(); + throw e; + } + return Status::OK(); } @@ -386,17 +449,117 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_, return Status::OK(); } +// PXU TODO: Support Swap +Status DBMetaImpl::archive_files() { + auto& criterias = _options.archive_conf.GetCriterias(); + if (criterias.size() == 0) { + return Status::OK(); + } + + for (auto kv : criterias) { + auto& criteria = kv.first; + auto& limit = kv.second; + if (criteria == "days") { + long usecs = limit * D_SEC * US_PS; + long now = utils::GetMicroSecTimeStamp(); + try + { + ConnectorPtr->update_all( + set( + c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE + ), + where( + c(&GroupFileSchema::created_on) < (long)(now - usecs) and + c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE + )); + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; + } + } + if (criteria == "disk") { + long sum = 0; + size(sum); + + // PXU TODO: refactor size + auto to_delete = (sum - limit*G); + discard_files_of_size(to_delete); + } + } + + return Status::OK(); +} + +Status DBMetaImpl::size(long& result) { + result = 0; + try { + auto selected = ConnectorPtr->select(columns(sum(&GroupFileSchema::size)), + where( + c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE + )); + + for (auto& sub_query : selected) { + if(!std::get<0>(sub_query)) { + continue; + } + result += (long)(*std::get<0>(sub_query)); + } + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; + } + + return Status::OK(); +} + +Status DBMetaImpl::discard_files_of_size(long to_discard_size) { + LOG(DEBUG) << "Abort to discard size=" << to_discard_size; + if (to_discard_size <= 0) { + return Status::OK(); + } + try { + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::size), + where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE), + order_by(&GroupFileSchema::id), + limit(10)); + std::vector ids; + + for (auto& file : selected) { + if (to_discard_size <= 0) break; + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.size = std::get<1>(file); + ids.push_back(group_file.id); + LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size; + to_discard_size -= group_file.size; + } + + if (ids.size() == 0) { + return Status::OK(); + } + + ConnectorPtr->update_all( + set( + c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE + ), + where( + in(&GroupFileSchema::id, ids) + )); + + } catch (std::exception & e) { + LOG(DEBUG) << e.what(); + throw e; + } + + + return discard_files_of_size(to_discard_size); +} + Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { - group_file.updated_time = GetMicroSecTimeStamp(); + group_file.updated_time = utils::GetMicroSecTimeStamp(); try { ConnectorPtr->update(group_file); - /* auto commited = ConnectorPtr->transaction([&] () mutable { */ - /* ConnectorPtr->update(group_file); */ - /* return true; */ - /* }); */ - /* if (!commited) { */ - /* return Status::DBTransactionError("Update file Error"); */ - /* } */ } catch (std::exception & e) { LOG(DEBUG) << e.what(); LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id; @@ -409,7 +572,7 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) { try { auto commited = ConnectorPtr->transaction([&] () mutable { for (auto& file : files) { - file.updated_time = GetMicroSecTimeStamp(); + file.updated_time = utils::GetMicroSecTimeStamp(); ConnectorPtr->update(file); } return true; @@ -425,16 +588,16 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) { } Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) { - auto now = GetMicroSecTimeStamp(); + auto now = utils::GetMicroSecTimeStamp(); try { auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, &GroupFileSchema::file_id, &GroupFileSchema::file_type, - &GroupFileSchema::rows, + &GroupFileSchema::size, &GroupFileSchema::date), where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and - c(&GroupFileSchema::updated_time) > now - 1000000*seconds)); + c(&GroupFileSchema::updated_time) > now - seconds*US_PS)); GroupFilesSchema updated; @@ -444,7 +607,7 @@ Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) { group_file.group_id = std::get<1>(file); group_file.file_id = std::get<2>(file); group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); + group_file.size = std::get<4>(file); group_file.date = std::get<5>(file); GetGroupFilePath(group_file); if (group_file.file_type == GroupFileSchema::TO_DELETE) { @@ -467,7 +630,7 @@ Status DBMetaImpl::cleanup() { &GroupFileSchema::group_id, &GroupFileSchema::file_id, &GroupFileSchema::file_type, - &GroupFileSchema::rows, + &GroupFileSchema::size, &GroupFileSchema::date), where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW)); @@ -480,7 +643,7 @@ Status DBMetaImpl::cleanup() { group_file.group_id = std::get<1>(file); group_file.file_id = std::get<2>(file); group_file.file_type = std::get<3>(file); - group_file.rows = std::get<4>(file); + group_file.size = std::get<4>(file); group_file.date = std::get<5>(file); GetGroupFilePath(group_file); if (group_file.file_type == GroupFileSchema::TO_DELETE) { @@ -500,7 +663,7 @@ Status DBMetaImpl::cleanup() { Status DBMetaImpl::count(const std::string& group_id, long& result) { try { - auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows, + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size, &GroupFileSchema::date), where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index aca0ec3141..6108860927 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -24,6 +24,8 @@ public: virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status add_group_file(GroupFileSchema& group_file_info) override; + virtual Status delete_group_partitions(const std::string& group_id, + const meta::DatesT& dates) override; virtual Status has_group_file(const std::string& group_id_, const std::string& file_id_, @@ -48,6 +50,10 @@ public: virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status archive_files() override; + + virtual Status size(long& result) override; + virtual Status cleanup() override; virtual Status cleanup_ttl_files(uint16_t seconds) override; @@ -59,8 +65,9 @@ public: virtual ~DBMetaImpl(); private: - - long GetMicroSecTimeStamp(); + Status NextFileId(std::string& file_id); + Status NextGroupId(std::string& group_id); + Status discard_files_of_size(long to_discard_size); Status get_group_no_lock(GroupSchema& group_info); std::string GetGroupPath(const std::string& group_id); std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date); diff --git a/cpp/src/db/Exception.h b/cpp/src/db/Exception.h new file mode 100644 index 0000000000..a5b4b4c421 --- /dev/null +++ b/cpp/src/db/Exception.h @@ -0,0 +1,54 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + +namespace zilliz { +namespace vecwise { +namespace engine { + +class Exception : public std::exception { +public: + Exception(const std::string& message) + : message_(message) { + } + + Exception() + : message_() { + } + + virtual const char* what() const throw() { + if (message_.empty()) { + return "Default Exception."; + } else { + return message_.c_str(); + } + } + + virtual ~Exception() throw() {}; + +protected: + + std::string message_; +}; + +class InvalidArgumentException : public Exception { +public: + InvalidArgumentException() : Exception("Invalid Argument"){}; + InvalidArgumentException(const std::string& message) : Exception(message) {}; +}; + +class OutOfRangeException : public Exception { +public: + OutOfRangeException() : Exception("Out Of Range"){}; + OutOfRangeException(const std::string& message) : Exception(message) {}; +}; + +} // namespace engine +} // namespace vecwise +} // namespace zilliz diff --git a/cpp/src/db/FaissExecutionEngine.cpp b/cpp/src/db/FaissExecutionEngine.cpp index 605b979481..c2165c948d 100644 --- a/cpp/src/db/FaissExecutionEngine.cpp +++ b/cpp/src/db/FaissExecutionEngine.cpp @@ -47,12 +47,12 @@ size_t FaissExecutionEngine::Count() const { template size_t FaissExecutionEngine::Size() const { - return (size_t)(Count() * pIndex_->d); + return (size_t)(Count() * pIndex_->d)*sizeof(float); } template size_t FaissExecutionEngine::PhysicalSize() const { - return (size_t)(Size()*sizeof(float)); + return (size_t)(Count() * pIndex_->d)*sizeof(float); } template diff --git a/cpp/src/db/LocalMetaImpl.cpp b/cpp/src/db/LocalMetaImpl.cpp index 60c23158be..aa852a3db6 100644 --- a/cpp/src/db/LocalMetaImpl.cpp +++ b/cpp/src/db/LocalMetaImpl.cpp @@ -241,6 +241,11 @@ Status LocalMetaImpl::update_files(GroupFilesSchema& files) { return Status::OK(); } +Status LocalMetaImpl::archive_files() { + //PXU TODO + return Status::OK(); +} + Status LocalMetaImpl::cleanup() { //PXU TODO return Status::OK(); @@ -256,6 +261,11 @@ Status LocalMetaImpl::drop_all() { return Status::OK(); } +Status LocalMetaImpl::size(long& result) { + // PXU TODO + return Status::OK(); +} + Status LocalMetaImpl::count(const std::string& group_id, long& result) { // PXU TODO return Status::OK(); diff --git a/cpp/src/db/LocalMetaImpl.h b/cpp/src/db/LocalMetaImpl.h index 4c324c5796..fb989d5f67 100644 --- a/cpp/src/db/LocalMetaImpl.h +++ b/cpp/src/db/LocalMetaImpl.h @@ -22,7 +22,9 @@ public: virtual Status get_group(GroupSchema& group_info_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; - virtual Status add_group_file(GroupFileSchema& group_file_info) = 0; + virtual Status add_group_file(GroupFileSchema& group_file_info) override; + /* virtual Status delete_group_partitions(const std::string& group_id, */ + /* const meta::DatesT& dates) override; */ virtual Status has_group_file(const std::string& group_id_, const std::string& file_id_, @@ -45,12 +47,16 @@ public: virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status archive_files() override; + virtual Status cleanup_ttl_files(uint16_t seconds) override; virtual Status count(const std::string& group_id, long& result) override; virtual Status drop_all() override; + virtual Status size(long& result) override; + private: Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info); diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index ede2a64522..601146ba66 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -13,6 +13,7 @@ #include "MemManager.h" #include "Meta.h" +#include "MetaConsts.h" namespace zilliz { @@ -48,16 +49,16 @@ size_t MemVectors::approximate_size() const { template Status MemVectors::serialize(std::string& group_id) { group_id = schema_.group_id; - auto rows = approximate_size(); + auto size = approximate_size(); pEE_->Serialize(); - schema_.rows = rows; - schema_.file_type = (rows >= options_.index_trigger_size) ? + schema_.size = size; + schema_.file_type = (size >= options_.index_trigger_size) ? meta::GroupFileSchema::TO_INDEX : meta::GroupFileSchema::RAW; auto status = pMeta_->update_group_file(schema_); LOG(DEBUG) << "New " << ((schema_.file_type == meta::GroupFileSchema::RAW) ? "raw" : "to_index") - << " file " << schema_.file_id << " of size " << pEE_->PhysicalSize() / (1024*1024) << " M"; + << " file " << schema_.file_id << " of size " << pEE_->Size() / meta::M << " M"; pEE_->Cache(); diff --git a/cpp/src/db/Meta.cpp b/cpp/src/db/Meta.cpp index 1b97c06c79..3d272e0637 100644 --- a/cpp/src/db/Meta.cpp +++ b/cpp/src/db/Meta.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ #include +#include #include "Meta.h" namespace zilliz { @@ -11,13 +12,33 @@ namespace vecwise { namespace engine { namespace meta { -DateT Meta::GetDate(const std::time_t& t) { - tm *ltm = std::localtime(&t); - return ltm->tm_year*10000 + ltm->tm_mon*100 + ltm->tm_mday; +DateT Meta::GetDate(const std::time_t& t, int day_delta) { + struct tm ltm; + localtime_r(&t, <m); + if (day_delta > 0) { + do { + ++ltm.tm_mday; + --day_delta; + } while(day_delta > 0); + mktime(<m); + } else if (day_delta < 0) { + do { + --ltm.tm_mday; + ++day_delta; + } while(day_delta < 0); + mktime(<m); + } else { + ltm.tm_mday; + } + return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday; +} + +DateT Meta::GetDateWithDelta(int day_delta) { + return GetDate(std::time(nullptr), day_delta); } DateT Meta::GetDate() { - return GetDate(std::time(nullptr)); + return GetDate(std::time(nullptr), 0); } } // namespace meta diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index e0c1a84c76..a4bbb23380 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -4,14 +4,11 @@ * Proprietary and confidential. ******************************************************************************/ #pragma once - -#include #include -#include -#include #include #include +#include "MetaTypes.h" #include "Options.h" #include "Status.h" @@ -20,44 +17,7 @@ namespace vecwise { namespace engine { namespace meta { -typedef int DateT; -const DateT EmptyDate = -1; -typedef std::vector DatesT; -struct GroupSchema { - size_t id; - std::string group_id; - size_t files_cnt = 0; - uint16_t dimension; - std::string location = ""; -}; // GroupSchema - - -struct GroupFileSchema { - typedef enum { - NEW, - RAW, - TO_INDEX, - INDEX, - TO_DELETE, - } FILE_TYPE; - - size_t id; - std::string group_id; - std::string file_id; - int file_type = NEW; - size_t rows; - DateT date = EmptyDate; - uint16_t dimension; - std::string location = ""; - long updated_time; -}; // GroupFileSchema - -typedef std::vector GroupFilesSchema; -typedef std::map DatePartionedGroupFilesSchema; - - -class Meta; class Meta { public: typedef std::shared_ptr Ptr; @@ -67,6 +27,8 @@ public: virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; virtual Status add_group_file(GroupFileSchema& group_file_info) = 0; + virtual Status delete_group_partitions(const std::string& group_id, + const meta::DatesT& dates) = 0; virtual Status has_group_file(const std::string& group_id_, const std::string& file_id_, @@ -89,6 +51,10 @@ public: virtual Status files_to_merge(const std::string& group_id, DatePartionedGroupFilesSchema& files) = 0; + virtual Status size(long& result) = 0; + + virtual Status archive_files() = 0; + virtual Status files_to_index(GroupFilesSchema&) = 0; virtual Status cleanup() = 0; @@ -98,8 +64,9 @@ public: virtual Status count(const std::string& group_id, long& result) = 0; - static DateT GetDate(const std::time_t& t); + static DateT GetDate(const std::time_t& t, int day_delta = 0); static DateT GetDate(); + static DateT GetDateWithDelta(int day_delta); }; // MetaData diff --git a/cpp/src/db/MetaConsts.h b/cpp/src/db/MetaConsts.h new file mode 100644 index 0000000000..e4247510c6 --- /dev/null +++ b/cpp/src/db/MetaConsts.h @@ -0,0 +1,32 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +namespace zilliz { +namespace vecwise { +namespace engine { +namespace meta { + +const size_t K = 1024UL; +const size_t M = K*K; +const size_t G = K*M; +const size_t T = K*G; + +const size_t S_PS = 1UL; +const size_t MS_PS = 1000*S_PS; +const size_t US_PS = 1000*MS_PS; +const size_t NS_PS = 1000*US_PS; + +const size_t SECOND = 1UL; +const size_t M_SEC = 60*SECOND; +const size_t H_SEC = 60*M_SEC; +const size_t D_SEC = 24*H_SEC; +const size_t W_SEC = 7*D_SEC; + +} // namespace meta +} // namespace engine +} // namespace vecwise +} // namespace zilliz diff --git a/cpp/src/db/MetaTypes.h b/cpp/src/db/MetaTypes.h new file mode 100644 index 0000000000..4b956590ae --- /dev/null +++ b/cpp/src/db/MetaTypes.h @@ -0,0 +1,57 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include + +namespace zilliz { +namespace vecwise { +namespace engine { +namespace meta { + +typedef int DateT; +const DateT EmptyDate = -1; +typedef std::vector DatesT; + +struct GroupSchema { + size_t id; + std::string group_id; + size_t files_cnt = 0; + uint16_t dimension; + std::string location = ""; + long created_on; +}; // GroupSchema + +struct GroupFileSchema { + typedef enum { + NEW, + RAW, + TO_INDEX, + INDEX, + TO_DELETE, + } FILE_TYPE; + + size_t id; + std::string group_id; + std::string file_id; + int file_type = NEW; + size_t size; + DateT date = EmptyDate; + uint16_t dimension; + std::string location = ""; + long updated_time; + long created_on; +}; // GroupFileSchema + +typedef std::vector GroupFilesSchema; +typedef std::map DatePartionedGroupFilesSchema; + +} // namespace meta +} // namespace engine +} // namespace vecwise +} // namespace zilliz diff --git a/cpp/src/db/Options.cpp b/cpp/src/db/Options.cpp index 2a0c01af8b..dfd6311b61 100644 --- a/cpp/src/db/Options.cpp +++ b/cpp/src/db/Options.cpp @@ -3,9 +3,15 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#include +#include +#include +#include + #include "Options.h" #include "Env.h" #include "DBMetaImpl.h" +#include "Exception.h" namespace zilliz { namespace vecwise { @@ -15,10 +21,54 @@ Options::Options() : env(Env::Default()) { } -/* DBMetaOptions::DBMetaOptions(const std::string& dbpath, */ -/* const std::string& uri) */ -/* : path(dbpath), backend_uri(uri) { */ -/* } */ +ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) { + ParseType(type); + ParseCritirias(criterias); +} + +void ArchiveConf::ParseCritirias(const std::string& criterias) { + std::stringstream ss(criterias); + std::vector tokens; + + boost::algorithm::split(tokens, criterias, boost::is_any_of(";")); + + if (tokens.size() == 0) { + return; + } + + for (auto& token : tokens) { + std::vector kv; + boost::algorithm::split(kv, token, boost::is_any_of(":")); + if (kv.size() != 2) { + LOG(WARNING) << "Invalid ArchiveConf Criterias: " << token << " Ignore!"; + continue; + } + if (kv[0] != "disk" && kv[0] != "days") { + LOG(WARNING) << "Invalid ArchiveConf Criterias: " << token << " Ignore!"; + continue; + } + try { + auto value = std::stoi(kv[1]); + criterias_[kv[0]] = value; + } + catch (std::out_of_range&){ + LOG(ERROR) << "Out of range: '" << kv[1] << "'"; + throw OutOfRangeException(); + } + catch (...){ + LOG(ERROR) << "Invalid argument: '" << kv[1] << "'"; + throw InvalidArgumentException(); + } + } +} + +void ArchiveConf::ParseType(const std::string& type) { + if (type != "delete" && type != "swap") { + LOG(ERROR) << "Invalid argument: type='" << type << "'"; + throw InvalidArgumentException(); + } + type_ = type; +} } // namespace engine } // namespace vecwise diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 5bbcf6dabe..100ab7b3ab 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -7,6 +7,7 @@ #include #include +#include namespace zilliz { namespace vecwise { @@ -14,10 +15,26 @@ namespace engine { class Env; +struct ArchiveConf { + using CriteriaT = std::map; + + ArchiveConf(const std::string& type, const std::string& criterias = "disk:512"); + + const std::string& GetType() const { return type_; } + const CriteriaT GetCriterias() const { return criterias_; } + +private: + void ParseCritirias(const std::string& type); + void ParseType(const std::string& criterias); + + std::string type_; + CriteriaT criterias_; +}; + struct DBMetaOptions { - /* DBMetaOptions(const std::string&, const std::string&); */ std::string path; std::string backend_uri; + ArchiveConf archive_conf = ArchiveConf("delete"); }; // DBMetaOptions @@ -25,7 +42,7 @@ struct Options { Options(); uint16_t memory_sync_interval = 1; uint16_t merge_trigger_number = 2; - size_t index_trigger_size = 1024*1024*256; + size_t index_trigger_size = 1024*1024*1024; Env* env; DBMetaOptions meta; }; // Options diff --git a/cpp/src/db/Utils.cpp b/cpp/src/db/Utils.cpp new file mode 100644 index 0000000000..e459bab4bb --- /dev/null +++ b/cpp/src/db/Utils.cpp @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include "Utils.h" + +namespace zilliz { +namespace vecwise { +namespace engine { +namespace utils { + +long GetMicroSecTimeStamp() { + auto now = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast( + now.time_since_epoch()).count(); + + return micros; +} + +} // namespace utils +} // namespace engine +} // namespace vecwise +} // namespace zilliz diff --git a/cpp/src/db/Utils.h b/cpp/src/db/Utils.h new file mode 100644 index 0000000000..cdcd37b832 --- /dev/null +++ b/cpp/src/db/Utils.h @@ -0,0 +1,19 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + + +namespace zilliz { +namespace vecwise { +namespace engine { +namespace utils { + +long GetMicroSecTimeStamp(); + +} // namespace utils +} // namespace engine +} // namespace vecwise +} // namespace zilliz diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index c9bc958b99..f8ced228e0 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -9,9 +9,103 @@ #include "utils.h" #include "db/DB.h" +#include "db/DBImpl.h" +#include "db/MetaConsts.h" using namespace zilliz::vecwise; +TEST_F(DBTest, CONFIG_TEST) { + { + ASSERT_ANY_THROW(engine::ArchiveConf conf("wrong")); + /* EXPECT_DEATH(engine::ArchiveConf conf("wrong"), ""); */ + } + { + engine::ArchiveConf conf("delete"); + ASSERT_EQ(conf.GetType(), "delete"); + auto criterias = conf.GetCriterias(); + ASSERT_TRUE(criterias.size() == 1); + ASSERT_TRUE(criterias["disk"] == 512); + } + { + engine::ArchiveConf conf("swap"); + ASSERT_EQ(conf.GetType(), "swap"); + auto criterias = conf.GetCriterias(); + ASSERT_TRUE(criterias.size() == 1); + ASSERT_TRUE(criterias["disk"] == 512); + } + { + ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "disk:")); + ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "disk:a")); + engine::ArchiveConf conf("swap", "disk:1024"); + auto criterias = conf.GetCriterias(); + ASSERT_TRUE(criterias.size() == 1); + ASSERT_TRUE(criterias["disk"] == 1024); + } + { + ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "days:")); + ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "days:a")); + engine::ArchiveConf conf("swap", "days:100"); + auto criterias = conf.GetCriterias(); + ASSERT_TRUE(criterias.size() == 1); + ASSERT_TRUE(criterias["days"] == 100); + } + { + ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "days:")); + ASSERT_ANY_THROW(engine::ArchiveConf conf2("swap", "days:a")); + engine::ArchiveConf conf("swap", "days:100;disk:200"); + auto criterias = conf.GetCriterias(); + ASSERT_TRUE(criterias.size() == 2); + ASSERT_TRUE(criterias["days"] == 100); + ASSERT_TRUE(criterias["disk"] == 200); + } +} + +TEST_F(DBTest2, ARHIVE_DISK_CHECK) { + + static const std::string group_name = "test_group"; + static const int group_dim = 256; + long size; + + engine::meta::GroupSchema group_info; + group_info.dimension = group_dim; + group_info.group_id = group_name; + engine::Status stat = db_->add_group(group_info); + + engine::meta::GroupSchema group_info_get; + group_info_get.group_id = group_name; + stat = db_->get_group(group_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(group_info_get.dimension, group_dim); + + engine::IDNumbers vector_ids; + engine::IDNumbers target_ids; + + db_->size(size); + int d = 256; + int nb = 20; + float *xb = new float[d * nb]; + for(int i = 0; i < nb; i++) { + for(int j = 0; j < d; j++) xb[d * i + j] = drand48(); + xb[d * i] += i / 2000.; + } + + int loop = 100000; + + for (auto i=0; iadd_vectors(group_name, nb, xb, vector_ids); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + db_->size(size); + LOG(DEBUG) << "size=" << size; + ASSERT_TRUE(size < 1 * engine::meta::G); + + delete [] xb; +}; + + TEST_F(DBTest, DB_TEST) { static const std::string group_name = "test_group"; @@ -63,7 +157,7 @@ TEST_F(DBTest, DB_TEST) { START_TIMER; stat = db_->search(group_name, k, qb, qxb, results); - ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/(1024*1024) << " M"; + ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/engine::meta::M << " M"; STOP_TIMER(ss.str()); ASSERT_STATS(stat); diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index 2ede539803..3832a3263d 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -6,10 +6,14 @@ #include #include #include +#include +#include #include "utils.h" #include "db/DBMetaImpl.h" #include "db/Factories.h" +#include "db/Utils.h" +#include "db/MetaConsts.h" using namespace zilliz::vecwise::engine; @@ -59,10 +63,124 @@ TEST_F(MetaTest, GROUP_FILE_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(group_file.file_type, new_file_type); - /* group_file.file_type = meta::GroupFileSchema::NEW; */ - /* status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file); */ - /* ASSERT_TRUE(status.ok()); */ - /* ASSERT_EQ(group_file.file_type, new_file_type); */ + meta::DatesT dates; + dates.push_back(meta::Meta::GetDate()); + status = impl_->delete_group_partitions(group_file.group_id, dates); + ASSERT_FALSE(status.ok()); + + dates.clear(); + for (auto i=2; i < 10; ++i) { + dates.push_back(meta::Meta::GetDateWithDelta(-1*i)); + } + status = impl_->delete_group_partitions(group_file.group_id, dates); + ASSERT_TRUE(status.ok()); + + group_file.date = meta::Meta::GetDateWithDelta(-2); + status = impl_->update_group_file(group_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(group_file.date, meta::Meta::GetDateWithDelta(-2)); + ASSERT_FALSE(group_file.file_type == meta::GroupFileSchema::TO_DELETE); + + dates.clear(); + dates.push_back(group_file.date); + status = impl_->delete_group_partitions(group_file.group_id, dates); + ASSERT_TRUE(status.ok()); + status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(group_file.file_type == meta::GroupFileSchema::TO_DELETE); +} + +TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { + srand(time(0)); + DBMetaOptions options; + options.path = "/tmp/vecwise_test"; + int days_num = rand() % 100; + std::stringstream ss; + ss << "days:" << days_num; + options.archive_conf = ArchiveConf("delete", ss.str()); + + auto impl = meta::DBMetaImpl(options); + auto group_id = "meta_test_group"; + + meta::GroupSchema group; + group.group_id = group_id; + auto status = impl.add_group(group); + + meta::GroupFilesSchema files; + meta::GroupFileSchema group_file; + group_file.group_id = group.group_id; + + auto cnt = 100; + long ts = utils::GetMicroSecTimeStamp(); + std::vector days; + for (auto i=0; i