diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 2c17a1dc8f..bffea82ef0 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -242,6 +242,8 @@ Status DBImpl::background_merge_files(const std::string& group_id) { try_build_index(); } + _pMeta->cleanup_ttl_files(1); + return Status::OK(); } @@ -255,13 +257,18 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { } auto opd = std::make_shared(); + opd->d = file.dimension; opd->index_type = "IDMap,Flat"; IndexBuilderPtr pBuilder = GetIndexBuilder(opd); auto from_index = dynamic_cast(faiss::read_index(file.location.c_str())); + std::cout << "Preparing build_index for file_id=" << file.file_id + << " with new index_file_id=" << group_file.file_id << std::endl; auto index = pBuilder->build_all(from_index->ntotal, dynamic_cast(from_index->index)->xb.data(), from_index->id_map.data()); + std::cout << "Ending build_index for file_id=" << file.file_id + << " with new index_file_id=" << group_file.file_id << std::endl; /* std::cout << "raw size=" << from_index->ntotal << " index size=" << index->ntotal << std::endl; */ write_index(index, group_file.location.c_str()); group_file.file_type = meta::GroupFileSchema::INDEX; diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index fe53936505..45a92fee6c 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "DBMetaImpl.h" @@ -28,6 +29,7 @@ inline auto StoragePrototype(const std::string& path) { make_column("file_id", &GroupFileSchema::file_id), make_column("file_type", &GroupFileSchema::file_type), make_column("rows", &GroupFileSchema::rows, default_value(0)), + make_column("updated_time", &GroupFileSchema::updated_time), make_column("date", &GroupFileSchema::date)) ); @@ -47,6 +49,14 @@ std::string DBMetaImpl::GetGroupPath(const std::string& group_id) { return _options.path + "/" + group_id; } +long DBMetaImpl::GetMicroSecTimeStamp() { + auto now = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast( + now.time_since_epoch()).count(); + + return micros; +} + std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) { std::stringstream ss; ss << GetGroupPath(group_id) << "/" << date; @@ -166,6 +176,7 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { group_file.file_id = ss.str(); group_file.dimension = group_info.dimension; group_file.rows = 0; + group_file.updated_time = GetMicroSecTimeStamp(); //ConnectorPtr->select(datetime("now", "localtime +1 hour")).front(); GetGroupFilePath(group_file); { @@ -329,7 +340,8 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_, return Status::OK(); } -Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file) { +Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { + group_file.updated_time = GetMicroSecTimeStamp(); auto commited = ConnectorPtr->transaction([&] () mutable { ConnectorPtr->update(group_file); return true; @@ -340,9 +352,10 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file) { return Status::OK(); } -Status DBMetaImpl::update_files(const GroupFilesSchema& files) { +Status DBMetaImpl::update_files(GroupFilesSchema& files) { auto commited = ConnectorPtr->transaction([&] () mutable { for (auto& file : files) { + file.updated_time = GetMicroSecTimeStamp(); ConnectorPtr->update(file); } return true; @@ -353,6 +366,38 @@ Status DBMetaImpl::update_files(const GroupFilesSchema& files) { return Status::OK(); } +Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) { + auto now = GetMicroSecTimeStamp(); + auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, + &GroupFileSchema::group_id, + &GroupFileSchema::file_id, + &GroupFileSchema::file_type, + &GroupFileSchema::rows, + &GroupFileSchema::date), + where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and + c(&GroupFileSchema::updated_time) > now - 1000000*seconds)); + + GroupFilesSchema updated; + + for (auto& file : selected) { + GroupFileSchema group_file; + group_file.id = std::get<0>(file); + group_file.group_id = std::get<1>(file); + group_file.file_id = std::get<2>(file); + group_file.file_type = std::get<3>(file); + group_file.rows = std::get<4>(file); + group_file.date = std::get<5>(file); + GetGroupFilePath(group_file); + if (group_file.file_type == GroupFileSchema::TO_DELETE) { + boost::filesystem::remove(group_file.location); + } + ConnectorPtr->remove(group_file.id); + std::cout << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; + } + + return Status::OK(); +} + Status DBMetaImpl::cleanup() { auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 8def7aea32..7ac0443343 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -27,13 +27,13 @@ public: virtual Status get_group_file(const std::string& group_id_, const std::string& file_id_, GroupFileSchema& group_file_info_) override; - virtual Status update_group_file(const GroupFileSchema& group_file_) override; + virtual Status update_group_file(GroupFileSchema& group_file_) override; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, GroupFilesSchema& group_files_info_) override; - virtual Status update_files(const GroupFilesSchema& files) override; + virtual Status update_files(GroupFilesSchema& files) override; virtual Status files_to_merge(const std::string& group_id, DatePartionedGroupFilesSchema& files) override; @@ -46,10 +46,13 @@ public: virtual Status cleanup() override; + virtual Status cleanup_ttl_files(uint16_t seconds) override; + virtual ~DBMetaImpl(); private: + long GetMicroSecTimeStamp(); Status get_group_no_lock(GroupSchema& group_info); std::string GetGroupPath(const std::string& group_id); std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date); diff --git a/cpp/src/db/LocalMetaImpl.cpp b/cpp/src/db/LocalMetaImpl.cpp index 7bdb507647..f7192a05b7 100644 --- a/cpp/src/db/LocalMetaImpl.cpp +++ b/cpp/src/db/LocalMetaImpl.cpp @@ -226,12 +226,12 @@ Status LocalMetaImpl::get_group_files(const std::string& group_id_, return Status::OK(); } -Status LocalMetaImpl::update_group_file(const GroupFileSchema& group_file_) { +Status LocalMetaImpl::update_group_file(GroupFileSchema& group_file_) { //PXU TODO return Status::OK(); } -Status LocalMetaImpl::update_files(const GroupFilesSchema& files) { +Status LocalMetaImpl::update_files(GroupFilesSchema& files) { //PXU TODO return Status::OK(); } @@ -241,6 +241,11 @@ Status LocalMetaImpl::cleanup() { return Status::OK(); } +Status LocalMetaImpl::cleanup_ttl_files(uint16_t seconds) { + // PXU TODO + return Status::OK(); +} + } // namespace meta } // namespace engine } // namespace vecwise diff --git a/cpp/src/db/LocalMetaImpl.h b/cpp/src/db/LocalMetaImpl.h index f74916c087..6c687500e1 100644 --- a/cpp/src/db/LocalMetaImpl.h +++ b/cpp/src/db/LocalMetaImpl.h @@ -26,13 +26,13 @@ public: virtual Status get_group_file(const std::string& group_id_, const std::string& file_id_, GroupFileSchema& group_file_info_) override; - virtual Status update_group_file(const GroupFileSchema& group_file_) override; + virtual Status update_group_file(GroupFileSchema& group_file_) override; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, GroupFilesSchema& group_files_info_) override; - virtual Status update_files(const GroupFilesSchema& files) override; + virtual Status update_files(GroupFilesSchema& files) override; virtual Status cleanup() override; @@ -41,6 +41,8 @@ public: virtual Status files_to_index(GroupFilesSchema&) override; + virtual Status cleanup_ttl_files(uint16_t seconds) override; + private: Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info); diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index 0a795cdd83..4dbc275f65 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -42,6 +42,7 @@ struct GroupFileSchema { DateT date = EmptyDate; uint16_t dimension; std::string location = ""; + long updated_time; }; // GroupFileSchema typedef std::vector GroupFilesSchema; @@ -62,13 +63,13 @@ public: virtual Status get_group_file(const std::string& group_id_, const std::string& file_id_, GroupFileSchema& group_file_info_) = 0; - virtual Status update_group_file(const GroupFileSchema& group_file_) = 0; + virtual Status update_group_file(GroupFileSchema& group_file_) = 0; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, GroupFilesSchema& group_files_info_) = 0; - virtual Status update_files(const GroupFilesSchema& files) = 0; + virtual Status update_files(GroupFilesSchema& files) = 0; virtual Status files_to_search(const std::string& group_id, std::vector partition, @@ -80,6 +81,7 @@ public: virtual Status files_to_index(GroupFilesSchema&) = 0; virtual Status cleanup() = 0; + virtual Status cleanup_ttl_files(uint16_t) = 0; static DateT GetDate(const std::time_t& t); static DateT GetDate();