diff --git a/cpp/src/db/db.h b/cpp/src/db/db.h index c14822b041..74dfe93edf 100644 --- a/cpp/src/db/db.h +++ b/cpp/src/db/db.h @@ -19,12 +19,12 @@ public: virtual Status add_group(const GroupOptions& options_, const std::string& group_id_, - GroupSchema& group_info_) = 0; - virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) = 0; + meta::GroupSchema& group_info_) = 0; + virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) = 0; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, - GroupFilesSchema& group_files_info_) = 0; + meta::GroupFilesSchema& group_files_info_) = 0; virtual Status add_vectors(const std::string& group_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) = 0; diff --git a/cpp/src/db/db_impl.cpp b/cpp/src/db/db_impl.cpp index 87c83e7027..723b80cb4a 100644 --- a/cpp/src/db/db_impl.cpp +++ b/cpp/src/db/db_impl.cpp @@ -1,6 +1,10 @@ #include #include #include +#include +#include +#include +#include #include "db_impl.h" #include "db_meta_impl.h" #include "env.h" @@ -15,21 +19,21 @@ DBImpl::DBImpl(const Options& options_, const std::string& name_) _options(options_), _bg_compaction_scheduled(false), _shutting_down(false), - _pMeta(new DBMetaImpl(*(_options.pMetaOptions))), + _pMeta(new meta::DBMetaImpl(*(_options.pMetaOptions))), _pMemMgr(new MemManager(_pMeta)) { start_timer_task(options_.memory_sync_interval); } Status DBImpl::add_group(const GroupOptions& options, const std::string& group_id, - GroupSchema& group_info) { + meta::GroupSchema& group_info) { assert((!options.has_id) || (options.has_id && ("" != group_id))); return _pMeta->add_group(options, group_id, group_info); } -Status DBImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) { +Status DBImpl::get_group(const std::string& group_id_, meta::GroupSchema& group_info_) { return _pMeta->get_group(group_id_, group_info_); } @@ -39,7 +43,7 @@ Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) { Status DBImpl::get_group_files(const std::string& group_id, const int date_delta, - GroupFilesSchema& group_files_info) { + meta::GroupFilesSchema& group_files_info) { return _pMeta->get_group_files(group_id, date_delta, group_files_info); } @@ -99,8 +103,64 @@ void DBImpl::background_call() { _bg_work_finish_signal.notify_all(); } + +Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, + const meta::GroupFilesSchema& files) { + meta::GroupFileSchema group_file; + Status status = _pMeta->add_group_file(group_id, date, group_file); + if (!status.ok()) { + return status; + } + + faiss::IndexFlat innerIndex(group_file.dimension); + faiss::IndexIDMap index(&innerIndex); + + meta::GroupFilesSchema updated; + + for (auto& file : files) { + auto file_index = dynamic_cast(faiss::read_index(file.location.c_str())); + index.add_with_ids(file_index->ntotal, dynamic_cast(file_index->index)->xb.data(), + file_index->id_map.data()); + auto file_schema = file; + file_schema.file_type = meta::GroupFileSchema::TO_DELETE; + updated.push_back(file_schema); + } + + faiss::write_index(&index, group_file.location.c_str()); + group_file.file_type = meta::GroupFileSchema::RAW; + updated.push_back(group_file); + status = _pMeta->update_files(updated); + + return status; +} + +Status DBImpl::background_merge_files(const std::string& group_id) { + meta::DatePartionedGroupFilesSchema raw_files; + /* auto status = _pMeta->get_small_raw_files(group_id, raw_files); */ + /* if (!status.ok()) { */ + /* _bg_error = status; */ + /* return status; */ + /* } */ + + if (raw_files.size() == 0) { + return Status::OK(); + } + + for (auto& kv : raw_files) { + merge_files(group_id, kv.first, kv.second); + } +} + void DBImpl::background_compaction() { - _pMemMgr->serialize(); + std::vector group_ids; + _pMemMgr->serialize(group_ids); + for (auto group_id : group_ids) { + std::cout << __func__ << " group_id=" << group_id << std::endl; + } + + if (group_ids.size() > 0) { + + } } DBImpl::~DBImpl() { diff --git a/cpp/src/db/db_impl.h b/cpp/src/db/db_impl.h index b8e45f8006..18c6b154d1 100644 --- a/cpp/src/db/db_impl.h +++ b/cpp/src/db/db_impl.h @@ -14,19 +14,23 @@ namespace engine { class Env; +namespace meta { + class Meta; +} + class DBImpl : public DB { public: DBImpl(const Options& options_, const std::string& name_); virtual Status add_group(const GroupOptions& options_, const std::string& group_id_, - GroupSchema& group_info_) override; - virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override; + meta::GroupSchema& group_info_) override; + virtual Status get_group(const std::string& group_id_, meta::GroupSchema& group_info_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; virtual Status get_group_files(const std::string& group_id_, const int date_delta_, - GroupFilesSchema& group_files_info_) override; + meta::GroupFilesSchema& group_files_info_) override; virtual Status add_vectors(const std::string& group_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) override; @@ -37,6 +41,10 @@ public: virtual ~DBImpl(); private: + Status merge_files(const std::string& group_id, + const meta::DateT& date, + const meta::GroupFilesSchema& files); + Status background_merge_files(const std::string& group_id); void try_schedule_compaction(); void start_timer_task(int interval_); @@ -56,7 +64,7 @@ private: Status _bg_error; std::atomic _shutting_down; - std::shared_ptr _pMeta; + std::shared_ptr _pMeta; std::shared_ptr _pMemMgr; }; // DBImpl diff --git a/cpp/src/db/db_meta.cpp b/cpp/src/db/db_meta.cpp index 9274b8258e..2ac0995962 100644 --- a/cpp/src/db/db_meta.cpp +++ b/cpp/src/db/db_meta.cpp @@ -1,10 +1,21 @@ +#include #include "db_meta.h" namespace zilliz { namespace vecwise { namespace engine { +namespace meta { +DateT Meta::GetDate(const std::time_t& t) { + tm *ltm = std::localtime(&t); + return ltm->tm_year*10000 + ltm->tm_mon*100 + ltm->tm_mday; +} +DateT Meta::GetDate() { + return GetDate(std::time(nullptr)); +} + +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/db_meta.h b/cpp/src/db/db_meta.h index 1eae4b95a6..22ab911fd3 100644 --- a/cpp/src/db/db_meta.h +++ b/cpp/src/db/db_meta.h @@ -3,12 +3,17 @@ #include #include #include +#include +#include #include "options.h" #include "status.h" namespace zilliz { namespace vecwise { namespace engine { +namespace meta { + +typedef int DateT; struct GroupSchema { size_t id; @@ -22,19 +27,24 @@ struct GroupSchema { struct GroupFileSchema { typedef enum { + NEW, RAW, - INDEX + INDEX, + TO_DELETE, } FILE_TYPE; size_t id; std::string group_id; std::string file_id; - int files_type = RAW; + int file_type = NEW; size_t rows; + DateT date; + uint16_t dimension; std::string location = ""; }; // GroupFileSchema typedef std::vector GroupFilesSchema; +typedef std::map DatePartionedGroupFilesSchema; class Meta { @@ -47,6 +57,9 @@ public: virtual Status add_group_file(const std::string& group_id_, GroupFileSchema& group_file_info_) = 0; + virtual Status add_group_file(const std::string& group_id, + DateT date, + GroupFileSchema& group_file_info) = 0; virtual Status has_group_file(const std::string& group_id_, const std::string& file_id_, bool& has_or_not_) = 0; @@ -59,8 +72,14 @@ public: const int date_delta_, GroupFilesSchema& group_files_info_) = 0; + virtual Status update_files(const GroupFilesSchema& files) = 0; + + static DateT GetDate(const std::time_t& t); + static DateT GetDate(); + }; // MetaData +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/db_meta_impl.cpp b/cpp/src/db/db_meta_impl.cpp index 7ab9f65000..1f8c4a94ce 100644 --- a/cpp/src/db/db_meta_impl.cpp +++ b/cpp/src/db/db_meta_impl.cpp @@ -6,6 +6,7 @@ namespace zilliz { namespace vecwise { namespace engine { +namespace meta { DBMetaImpl::DBMetaImpl(const MetaOptions& options_) : _options(static_cast(options_)) { @@ -26,12 +27,6 @@ Status DBMetaImpl::add_group(const GroupOptions& options_, Status DBMetaImpl::get_group(const std::string& group_id_, GroupSchema& group_info_) { //PXU TODO - std::stringstream ss; - SimpleIDGenerator g; - ss.str(""); - ss << "/tmp/test/" << g.getNextIDNumber() << ".log"; - group_info_.dimension = 64; - group_info_.next_file_location = ss.str(); return Status::OK(); } @@ -40,9 +35,24 @@ Status DBMetaImpl::has_group(const std::string& group_id_, bool& has_or_not_) { return Status::OK(); } -Status DBMetaImpl::add_group_file(const std::string& group_id_, - GroupFileSchema& group_file_info_) { +Status DBMetaImpl::add_group_file(const std::string& group_id, + GroupFileSchema& group_file_info) { + return add_group_file(group_id, Meta::GetDate(), group_file_info); +} + +Status DBMetaImpl::add_group_file(const std::string& group_id, + DateT date, + GroupFileSchema& group_file_info) { //PXU TODO + std::stringstream ss; + SimpleIDGenerator g; + ss << "/tmp/test/" << date + << "/" << g.getNextIDNumber() + << ".log"; + group_file_info.group_id = "1"; + group_file_info.dimension = 64; + group_file_info.location = ss.str(); + group_file_info.date = date; return Status::OK(); } @@ -72,6 +82,12 @@ Status DBMetaImpl::update_group_file(const GroupFileSchema& group_file_) { return Status::OK(); } +Status DBMetaImpl::update_files(const GroupFilesSchema& files) { + //PXU TODO + return Status::OK(); +} + +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/db_meta_impl.h b/cpp/src/db/db_meta_impl.h index f8602c1635..06e04902ce 100644 --- a/cpp/src/db/db_meta_impl.h +++ b/cpp/src/db/db_meta_impl.h @@ -7,6 +7,7 @@ namespace zilliz { namespace vecwise { namespace engine { +namespace meta { class DBMetaImpl : public Meta { public: @@ -18,6 +19,9 @@ public: virtual Status get_group(const std::string& group_id_, GroupSchema& group_info_) override; virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override; + virtual Status add_group_file(const std::string& group_id, + DateT date, + GroupFileSchema& group_file_info) override; virtual Status add_group_file(const std::string& group_id_, GroupFileSchema& group_file_info_) override; virtual Status has_group_file(const std::string& group_id_, @@ -32,6 +36,8 @@ public: const int date_delta_, GroupFilesSchema& group_files_info_) override; + virtual Status update_files(const GroupFilesSchema& files) override; + private: Status initialize(); @@ -40,6 +46,7 @@ private: }; // DBMetaImpl +} // namespace meta } // namespace engine } // namespace vecwise } // namespace zilliz diff --git a/cpp/src/db/memvectors.cpp b/cpp/src/db/memvectors.cpp index 2bafe3a9de..384e69be3f 100644 --- a/cpp/src/db/memvectors.cpp +++ b/cpp/src/db/memvectors.cpp @@ -13,10 +13,12 @@ namespace zilliz { namespace vecwise { namespace engine { -MemVectors::MemVectors(size_t dimension_, const std::string& file_location_) : - _file_location(file_location_), +MemVectors::MemVectors(const std::string& group_id, + size_t dimension, const std::string& file_location) : + group_id_(group_id), + _file_location(file_location), _pIdGenerator(new SimpleIDGenerator()), - _dimension(dimension_), + _dimension(dimension), _pInnerIndex(new faiss::IndexFlat(_dimension)), _pIdMapIndex(new faiss::IndexIDMap(_pInnerIndex)) { } @@ -37,13 +39,15 @@ size_t MemVectors::approximate_size() const { return total() * _dimension; } -void MemVectors::serialize() { +Status MemVectors::serialize(std::string& group_id) { /* std::stringstream ss; */ /* ss << "/tmp/test/" << _pIdGenerator->getNextIDNumber(); */ /* faiss::write_index(_pIdMapIndex, ss.str().c_str()); */ /* std::cout << _pIdMapIndex->ntotal << std::endl; */ /* std::cout << _file_location << std::endl; */ faiss::write_index(_pIdMapIndex, _file_location.c_str()); + group_id = group_id_; + return Status::OK(); } MemVectors::~MemVectors() { @@ -71,14 +75,15 @@ VectorsPtr MemManager::get_mem_by_group(const std::string& group_id) { return memIt->second; } - GroupSchema group_info; - Status status = _pMeta->get_group(group_id, group_info); + meta::GroupFileSchema group_file; + auto status = _pMeta->add_group_file(group_id, group_file); if (!status.ok()) { return nullptr; } - _memMap[group_id] = std::shared_ptr(new MemVectors(group_info.dimension, - group_info.next_file_location)); + _memMap[group_id] = std::shared_ptr(new MemVectors(group_file.group_id, + group_file.dimension, + group_file.location)); return _memMap[group_id]; } @@ -126,10 +131,13 @@ Status MemManager::mark_memory_as_immutable() { /* return false; */ /* } */ -Status MemManager::serialize() { +Status MemManager::serialize(std::vector& group_ids) { mark_memory_as_immutable(); + std::string group_id; + group_ids.clear(); for (auto& mem : _immMems) { - mem->serialize(); + mem->serialize(group_id); + group_ids.push_back(group_id); } _immMems.clear(); return Status::OK(); diff --git a/cpp/src/db/memvectors.h b/cpp/src/db/memvectors.h index f9a0bba1f2..5a6df87eae 100644 --- a/cpp/src/db/memvectors.h +++ b/cpp/src/db/memvectors.h @@ -19,9 +19,15 @@ namespace zilliz { namespace vecwise { namespace engine { +namespace meta { + class Meta; +} + class MemVectors { public: - explicit MemVectors(size_t dimension_, const std::string& file_location_); + explicit MemVectors(const std::string& group_id, + size_t dimension, + const std::string& file_location); void add(size_t n_, const float* vectors_, IDNumbers& vector_ids_); @@ -29,7 +35,7 @@ public: size_t approximate_size() const; - void serialize(); + Status serialize(std::string& group_id); ~MemVectors(); @@ -40,6 +46,7 @@ private: MemVectors(const MemVectors&) = delete; MemVectors& operator=(const MemVectors&) = delete; + std::string group_id_; const std::string _file_location; IDGenerator* _pIdGenerator; size_t _dimension; @@ -49,12 +56,11 @@ private: }; // MemVectors -class Meta; typedef std::shared_ptr VectorsPtr; class MemManager { public: - MemManager(const std::shared_ptr& meta_) + MemManager(const std::shared_ptr& meta_) : _pMeta(meta_) /*_last_compact_time(std::time(nullptr))*/ {} VectorsPtr get_mem_by_group(const std::string& group_id_); @@ -62,7 +68,7 @@ public: Status add_vectors(const std::string& group_id_, size_t n_, const float* vectors_, IDNumbers& vector_ids_); - Status serialize(); + Status serialize(std::vector& group_ids); private: Status add_vectors_no_lock(const std::string& group_id_, @@ -73,7 +79,7 @@ private: typedef std::vector ImmMemPool; MemMap _memMap; ImmMemPool _immMems; - std::shared_ptr _pMeta; + std::shared_ptr _pMeta; /* std::time_t _last_compact_time; */ std::mutex _mutex; }; // MemManager