From b377266934dfa8650c3e34371736f933df1d3d8a Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Fri, 19 Apr 2019 19:00:29 +0800 Subject: [PATCH 1/4] feat(db): add some print Former-commit-id: 200b365c40532a13d777457bd53e6788fd613559 --- cpp/src/db/DBImpl.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 2c17a1dc8f..8f0fc27715 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -255,13 +255,18 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { } auto opd = std::make_shared(); + opd->d = file.dimension; opd->index_type = "IDMap,Flat"; IndexBuilderPtr pBuilder = GetIndexBuilder(opd); auto from_index = dynamic_cast(faiss::read_index(file.location.c_str())); + std::cout << "Preparing build_index for file_id=" << file.file_id + << " with new index_file_id=" << group_file.file_id << std::endl; auto index = pBuilder->build_all(from_index->ntotal, dynamic_cast(from_index->index)->xb.data(), from_index->id_map.data()); + std::cout << "Ending build_index for file_id=" << file.file_id + << " with new index_file_id=" << group_file.file_id << std::endl; /* std::cout << "raw size=" << from_index->ntotal << " index size=" << index->ntotal << std::endl; */ write_index(index, group_file.location.c_str()); group_file.file_type = meta::GroupFileSchema::INDEX; From 216aeb6bd03a291b0e0e295e99ba662a985284bc Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Mon, 22 Apr 2019 14:15:45 +0800 Subject: [PATCH 2/4] feat(db): add updated_time in group_file meta Former-commit-id: 0a2c3549b8b82267db3facc9606d8f8f90b02de2 --- cpp/src/db/DBMetaImpl.cpp | 8 ++++++-- cpp/src/db/DBMetaImpl.h | 4 ++-- cpp/src/db/LocalMetaImpl.cpp | 4 ++-- cpp/src/db/LocalMetaImpl.h | 4 ++-- cpp/src/db/Meta.h | 5 +++-- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index fe53936505..a0a3cdb3cd 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -28,6 +28,7 @@ inline auto StoragePrototype(const std::string& path) { make_column("file_id", &GroupFileSchema::file_id), make_column("file_type", &GroupFileSchema::file_type), make_column("rows", &GroupFileSchema::rows, default_value(0)), + make_column("updated_time", &GroupFileSchema::updated_time), make_column("date", &GroupFileSchema::date)) ); @@ -166,6 +167,7 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { group_file.file_id = ss.str(); group_file.dimension = group_info.dimension; group_file.rows = 0; + group_file.updated_time = ConnectorPtr->select(datetime("now", "localtime")).front(); GetGroupFilePath(group_file); { @@ -329,7 +331,8 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_, return Status::OK(); } -Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file) { +Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { + group_file.updated_time = ConnectorPtr->select(datetime("now", "localtime")).front(); auto commited = ConnectorPtr->transaction([&] () mutable { ConnectorPtr->update(group_file); return true; @@ -340,9 +343,10 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file) { return Status::OK(); } -Status DBMetaImpl::update_files(const GroupFilesSchema& files) { +Status DBMetaImpl::update_files(GroupFilesSchema& files) { auto commited = ConnectorPtr->transaction([&] () mutable { for (auto& file : files) { + file.updated_time = ConnectorPtr->select(datetime("now", "localtime")).front(); ConnectorPtr->update(file); } return true; diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 8def7aea32..7cfed9d2e6 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -27,13 +27,13 @@ public: virtual Status get_group_file(const std::string& group_id_, const std::string& file_id_, GroupFileSchema& group_file_info_) override; - virtual Status update_group_file(const GroupFileSchema& group_file_) override; + virtual Status update_group_file(GroupFileSchema& group_file_) override; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, GroupFilesSchema& group_files_info_) override; - virtual Status update_files(const GroupFilesSchema& files) override; + virtual Status update_files(GroupFilesSchema& files) override; virtual Status files_to_merge(const std::string& group_id, DatePartionedGroupFilesSchema& files) override; diff --git a/cpp/src/db/LocalMetaImpl.cpp b/cpp/src/db/LocalMetaImpl.cpp index 7bdb507647..07a26240e9 100644 --- a/cpp/src/db/LocalMetaImpl.cpp +++ b/cpp/src/db/LocalMetaImpl.cpp @@ -226,12 +226,12 @@ Status LocalMetaImpl::get_group_files(const std::string& group_id_, return Status::OK(); } -Status LocalMetaImpl::update_group_file(const GroupFileSchema& group_file_) { +Status LocalMetaImpl::update_group_file(GroupFileSchema& group_file_) { //PXU TODO return Status::OK(); } -Status LocalMetaImpl::update_files(const GroupFilesSchema& files) { +Status LocalMetaImpl::update_files(GroupFilesSchema& files) { //PXU TODO return Status::OK(); } diff --git a/cpp/src/db/LocalMetaImpl.h b/cpp/src/db/LocalMetaImpl.h index f74916c087..8d6bb57076 100644 --- a/cpp/src/db/LocalMetaImpl.h +++ b/cpp/src/db/LocalMetaImpl.h @@ -26,13 +26,13 @@ public: virtual Status get_group_file(const std::string& group_id_, const std::string& file_id_, GroupFileSchema& group_file_info_) override; - virtual Status update_group_file(const GroupFileSchema& group_file_) override; + virtual Status update_group_file(GroupFileSchema& group_file_) override; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, GroupFilesSchema& group_files_info_) override; - virtual Status update_files(const GroupFilesSchema& files) override; + virtual Status update_files(GroupFilesSchema& files) override; virtual Status cleanup() override; diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index 0a795cdd83..ad76ebddd1 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -42,6 +42,7 @@ struct GroupFileSchema { DateT date = EmptyDate; uint16_t dimension; std::string location = ""; + std::string updated_time = ""; }; // GroupFileSchema typedef std::vector GroupFilesSchema; @@ -62,13 +63,13 @@ public: virtual Status get_group_file(const std::string& group_id_, const std::string& file_id_, GroupFileSchema& group_file_info_) = 0; - virtual Status update_group_file(const GroupFileSchema& group_file_) = 0; + virtual Status update_group_file(GroupFileSchema& group_file_) = 0; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, GroupFilesSchema& group_files_info_) = 0; - virtual Status update_files(const GroupFilesSchema& files) = 0; + virtual Status update_files(GroupFilesSchema& files) = 0; virtual Status files_to_search(const std::string& group_id, std::vector partition, From abacf3e234281a0f723099bdc01d7f664e7f016d Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Mon, 22 Apr 2019 14:50:47 +0800 Subject: [PATCH 3/4] feat(db): change updated_time type of long Former-commit-id: e58de06418bdb6b4f40f646ff5dc3d6a0efb12c0 --- cpp/src/db/DBMetaImpl.cpp | 15 ++++++++++++--- cpp/src/db/DBMetaImpl.h | 1 + cpp/src/db/Meta.h | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index a0a3cdb3cd..194570d13b 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "DBMetaImpl.h" @@ -48,6 +49,14 @@ std::string DBMetaImpl::GetGroupPath(const std::string& group_id) { return _options.path + "/" + group_id; } +long DBMetaImpl::GetMicroSecTimeStamp() { + auto now = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast( + now.time_since_epoch()).count(); + + return micros; +} + std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) { std::stringstream ss; ss << GetGroupPath(group_id) << "/" << date; @@ -167,7 +176,7 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { group_file.file_id = ss.str(); group_file.dimension = group_info.dimension; group_file.rows = 0; - group_file.updated_time = ConnectorPtr->select(datetime("now", "localtime")).front(); + group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front(); GetGroupFilePath(group_file); { @@ -332,7 +341,7 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_, } Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { - group_file.updated_time = ConnectorPtr->select(datetime("now", "localtime")).front(); + group_file.updated_time = GetMicroSecTimeStamp(); auto commited = ConnectorPtr->transaction([&] () mutable { ConnectorPtr->update(group_file); return true; @@ -346,7 +355,7 @@ Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { Status DBMetaImpl::update_files(GroupFilesSchema& files) { auto commited = ConnectorPtr->transaction([&] () mutable { for (auto& file : files) { - file.updated_time = ConnectorPtr->select(datetime("now", "localtime")).front(); + file.updated_time = GetMicroSecTimeStamp(); ConnectorPtr->update(file); } return true; diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 7cfed9d2e6..726ce15bec 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -50,6 +50,7 @@ public: private: + long GetMicroSecTimeStamp(); Status get_group_no_lock(GroupSchema& group_info); std::string GetGroupPath(const std::string& group_id); std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date); diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index ad76ebddd1..c630ab2101 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -42,7 +42,7 @@ struct GroupFileSchema { DateT date = EmptyDate; uint16_t dimension; std::string location = ""; - std::string updated_time = ""; + long updated_time; }; // GroupFileSchema typedef std::vector GroupFilesSchema; From 2943126bda2b68e70b852cf0b8c215fdea2790c5 Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Mon, 22 Apr 2019 15:08:26 +0800 Subject: [PATCH 4/4] feat(db): add ttl files cleanup api Former-commit-id: bb84f6f2baa8c20c9f0e745c67213af236609507 --- cpp/src/db/DBImpl.cpp | 2 ++ cpp/src/db/DBMetaImpl.cpp | 32 ++++++++++++++++++++++++++++++++ cpp/src/db/DBMetaImpl.h | 2 ++ cpp/src/db/LocalMetaImpl.cpp | 5 +++++ cpp/src/db/LocalMetaImpl.h | 2 ++ cpp/src/db/Meta.h | 1 + 6 files changed, 44 insertions(+) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 8f0fc27715..bffea82ef0 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -242,6 +242,8 @@ Status DBImpl::background_merge_files(const std::string& group_id) { try_build_index(); } + _pMeta->cleanup_ttl_files(1); + return Status::OK(); } diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 194570d13b..45a92fee6c 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -366,6 +366,38 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) { return Status::OK(); } +Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) { + auto now = GetMicroSecTimeStamp(); + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and + c(&GroupFileSchema::updated_time) > now - 1000000*seconds)); + + GroupFilesSchema updated; + + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + GetGroupFilePath(group_file); + if (group_file.file_type == GroupFileSchema::TO_DELETE) { + boost::filesystem::remove(group_file.location); + } + ConnectorPtr->remove(group_file.id); + std::cout << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; + } + + return Status::OK(); +} + Status DBMetaImpl::cleanup() { auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 726ce15bec..7ac0443343 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -46,6 +46,8 @@ public: virtual Status cleanup() override; + virtual Status cleanup_ttl_files(uint16_t seconds) override; + virtual ~DBMetaImpl(); private: diff --git a/cpp/src/db/LocalMetaImpl.cpp b/cpp/src/db/LocalMetaImpl.cpp index 07a26240e9..f7192a05b7 100644 --- a/cpp/src/db/LocalMetaImpl.cpp +++ b/cpp/src/db/LocalMetaImpl.cpp @@ -241,6 +241,11 @@ Status LocalMetaImpl::cleanup() { return Status::OK(); } +Status LocalMetaImpl::cleanup_ttl_files(uint16_t seconds) { + // PXU TODO + return Status::OK(); +} + } // namespace meta } // namespace engine } // namespace vecwise diff --git a/cpp/src/db/LocalMetaImpl.h b/cpp/src/db/LocalMetaImpl.h index 8d6bb57076..6c687500e1 100644 --- a/cpp/src/db/LocalMetaImpl.h +++ b/cpp/src/db/LocalMetaImpl.h @@ -41,6 +41,8 @@ public: virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status cleanup_ttl_files(uint16_t seconds) override; + private: Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info); diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index c630ab2101..4dbc275f65 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -81,6 +81,7 @@ public: virtual Status files_to_index(GroupFilesSchema&) = 0; virtual Status cleanup() = 0; + virtual Status cleanup_ttl_files(uint16_t) = 0; static DateT GetDate(const std::time_t& t); static DateT GetDate();