Merge branch 'code_refactor' into 'branch-1.2'

MS-20 Clean Code Part 1

See merge request megasearch/vecwise_engine!35

Former-commit-id: 41f8e60f0e70a4f87bc455d4db976bab58e06fce
This commit is contained in:
jinhai 2019-05-28 17:44:51 +08:00
commit 97373af4f2
32 changed files with 809 additions and 1350 deletions

View File

@ -17,3 +17,4 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-1 - Add CHANGELOG.md
- MS-4 - Refactor the vecwise_engine code structure
- MS-6 - Implement SDK interface part 1
- MS-20 - Clean Code Part 1

View File

@ -7,8 +7,6 @@
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
/* #include "FaissExecutionEngine.h" */
/* #include "Traits.h" */
#include "Factories.h"
namespace zilliz {

View File

@ -5,12 +5,13 @@
******************************************************************************/
#pragma once
#include <string>
#include "Options.h"
#include "Meta.h"
#include "Status.h"
#include "Types.h"
#include <string>
namespace zilliz {
namespace vecwise {
namespace engine {
@ -21,29 +22,22 @@ class DB {
public:
static void Open(const Options& options, DB** dbptr);
virtual Status add_group(meta::GroupSchema& group_info_) = 0;
virtual Status get_group(meta::GroupSchema& group_info_) = 0;
virtual Status delete_vectors(const std::string& group_id,
const meta::DatesT& dates) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
meta::GroupFilesSchema& group_files_info_) = 0;
virtual Status CreateTable(meta::TableSchema& table_schema_) = 0;
virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0;
virtual Status HasTable(const std::string& table_id_, bool& has_or_not_) = 0;
virtual Status add_vectors(const std::string& group_id_,
virtual Status InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, QueryResults& results) = 0;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) = 0;
virtual Status size(long& result) = 0;
virtual Status Size(long& result) = 0;
virtual Status drop_all() = 0;
virtual Status count(const std::string& group_id, long& result) = 0;
virtual Status DropAll() = 0;
DB() = default;
DB(const DB&) = delete;

View File

@ -5,15 +5,15 @@
******************************************************************************/
#pragma once
#include "DB.h"
#include "MemManager.h"
#include "Types.h"
#include "Traits.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
#include "DB.h"
#include "MemManager.h"
#include "Types.h"
#include "FaissExecutionEngine.h"
#include "Traits.h"
namespace zilliz {
namespace vecwise {
@ -28,63 +28,56 @@ namespace meta {
template <typename EngineT>
class DBImpl : public DB {
public:
typedef typename meta::Meta::Ptr MetaPtr;
typedef typename MemManager<EngineT>::Ptr MemManagerPtr;
using MetaPtr = meta::Meta::Ptr;
using MemManagerPtr = typename MemManager<EngineT>::Ptr;
DBImpl(const Options& options);
virtual Status add_group(meta::GroupSchema& group_info) override;
virtual Status get_group(meta::GroupSchema& group_info) override;
virtual Status delete_vectors(const std::string& group_id, const meta::DatesT& dates) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status CreateTable(meta::TableSchema& table_schema) override;
virtual Status DescribeTable(meta::TableSchema& table_schema) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
meta::GroupFilesSchema& group_files_info_) override;
virtual Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids) override;
virtual Status add_vectors(const std::string& group_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) override;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, QueryResults& results) override;
virtual Status search(const std::string& group_id, size_t k, size_t nq,
virtual Status Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) override;
virtual Status drop_all() override;
virtual Status DropAll() override;
virtual Status count(const std::string& group_id, long& result) override;
virtual Status size(long& result) override;
virtual Status Size(long& result) override;
virtual ~DBImpl();
private:
void background_build_index();
Status build_index(const meta::GroupFileSchema&);
Status try_build_index();
Status merge_files(const std::string& group_id,
void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema&);
Status TryBuildIndex();
Status MergeFiles(const std::string& table_id,
const meta::DateT& date,
const meta::GroupFilesSchema& files);
Status background_merge_files(const std::string& group_id);
const meta::TableFilesSchema& files);
Status BackgroundMergeFiles(const std::string& table_id);
void try_schedule_compaction();
void start_timer_task(int interval_);
void background_timer_task(int interval_);
void TrySchedule();
void StartTimerTasks(int interval);
void BackgroundTimerTask(int interval);
static void BGWork(void* db);
void background_call();
void background_compaction();
void BackgroundCall();
void BackgroundCompaction();
Env* const _env;
const Options _options;
Env* const env_;
const Options options_;
std::mutex _mutex;
std::condition_variable _bg_work_finish_signal;
bool _bg_compaction_scheduled;
Status _bg_error;
std::atomic<bool> _shutting_down;
std::mutex mutex_;
std::condition_variable bg_work_finish_signal_;
bool bg_compaction_scheduled_;
Status bg_error_;
std::atomic<bool> shutting_down_;
std::mutex build_index_mutex_;
bool bg_build_index_started_;
@ -92,8 +85,8 @@ private:
std::thread bg_timer_thread_;
MetaPtr _pMeta;
MemManagerPtr _pMemMgr;
MetaPtr pMeta_;
MemManagerPtr pMemMgr_;
}; // DBImpl
@ -102,4 +95,4 @@ private:
} // namespace vecwise
} // namespace zilliz
#include "DBImpl.cpp"
#include "DBImpl.inl"

View File

@ -3,8 +3,11 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#ifndef DBIMPL_CPP__
#define DBIMPL_CPP__
#pragma once
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include <assert.h>
#include <chrono>
@ -14,86 +17,68 @@
#include <easylogging++.h>
#include <cache/CpuCacheMgr.h>
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
namespace zilliz {
namespace vecwise {
namespace engine {
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
: _env(options.env),
_options(options),
_bg_compaction_scheduled(false),
_shutting_down(false),
: env_(options.env),
options_(options),
bg_compaction_scheduled_(false),
shutting_down_(false),
bg_build_index_started_(false),
_pMeta(new meta::DBMetaImpl(_options.meta)),
_pMemMgr(new MemManager<EngineT>(_pMeta, _options)) {
start_timer_task(_options.memory_sync_interval);
pMeta_(new meta::DBMetaImpl(options_.meta)),
pMemMgr_(new MemManager<EngineT>(pMeta_, options_)) {
StartTimerTasks(options_.memory_sync_interval);
}
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::GroupSchema& group_info) {
return _pMeta->add_group(group_info);
Status DBImpl<EngineT>::CreateTable(meta::TableSchema& table_schema) {
return pMeta_->CreateTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
return _pMeta->get_group(group_info);
Status DBImpl<EngineT>::DescribeTable(meta::TableSchema& table_schema) {
return pMeta_->DescribeTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::delete_vectors(const std::string& group_id,
const meta::DatesT& dates) {
return _pMeta->delete_group_partitions(group_id, dates);
Status DBImpl<EngineT>::HasTable(const std::string& table_id, bool& has_or_not) {
return pMeta_->HasTable(table_id, has_or_not);
}
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
return _pMeta->has_group(group_id_, has_or_not_);
}
template<typename EngineT>
Status DBImpl<EngineT>::get_group_files(const std::string& group_id,
const int date_delta,
meta::GroupFilesSchema& group_files_info) {
return _pMeta->get_group_files(group_id, date_delta, group_files_info);
}
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
Status DBImpl<EngineT>::InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
if (!status.ok()) {
return status;
}
}
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string &group_id, size_t k, size_t nq,
Status DBImpl<EngineT>::Query(const std::string &table_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
meta::DatesT dates = {meta::Meta::GetDate()};
return search(group_id, k, nq, vectors, dates, results);
return Query(table_id, k, nq, vectors, dates, results);
}
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedGroupFilesSchema files;
auto status = _pMeta->files_to_search(group_id, dates, files);
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
if (!status.ok()) { return status; }
LOG(DEBUG) << "Search DateT Size=" << files.size();
meta::GroupFilesSchema index_files;
meta::GroupFilesSchema raw_files;
meta::TableFilesSchema index_files;
meta::TableFilesSchema raw_files;
for (auto &day_files : files) {
for (auto &file : day_files.second) {
file.file_type == meta::GroupFileSchema::INDEX ?
file.file_type == meta::TableFileSchema::INDEX ?
index_files.push_back(file) : raw_files.push_back(file);
}
}
@ -132,7 +117,7 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
long search_set_size = 0;
auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void {
auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
for (auto &file : file_vec) {
EngineT index(file.dimension, file.location);
index.Load();
@ -204,98 +189,98 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
}
if (results.empty()) {
return Status::NotFound("Group " + group_id + ", search result not found!");
return Status::NotFound("Group " + table_id + ", search result not found!");
}
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::start_timer_task(int interval_) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::background_timer_task, this, interval_);
void DBImpl<EngineT>::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::BackgroundTimerTask, this, interval);
}
template<typename EngineT>
void DBImpl<EngineT>::background_timer_task(int interval_) {
void DBImpl<EngineT>::BackgroundTimerTask(int interval) {
Status status;
while (true) {
if (!_bg_error.ok()) break;
if (_shutting_down.load(std::memory_order_acquire)) break;
if (!bg_error_.ok()) break;
if (shutting_down_.load(std::memory_order_acquire)) break;
std::this_thread::sleep_for(std::chrono::seconds(interval_));
std::this_thread::sleep_for(std::chrono::seconds(interval));
try_schedule_compaction();
TrySchedule();
}
}
template<typename EngineT>
void DBImpl<EngineT>::try_schedule_compaction() {
if (_bg_compaction_scheduled) return;
if (!_bg_error.ok()) return;
void DBImpl<EngineT>::TrySchedule() {
if (bg_compaction_scheduled_) return;
if (!bg_error_.ok()) return;
_bg_compaction_scheduled = true;
_env->schedule(&DBImpl<EngineT>::BGWork, this);
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl<EngineT>::BGWork, this);
}
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
reinterpret_cast<DBImpl*>(db_)->background_call();
reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
}
template<typename EngineT>
void DBImpl<EngineT>::background_call() {
std::lock_guard<std::mutex> lock(_mutex);
assert(_bg_compaction_scheduled);
void DBImpl<EngineT>::BackgroundCall() {
std::lock_guard<std::mutex> lock(mutex_);
assert(bg_compaction_scheduled_);
if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire))
if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
return ;
background_compaction();
BackgroundCompaction();
_bg_compaction_scheduled = false;
_bg_work_finish_signal.notify_all();
bg_compaction_scheduled_ = false;
bg_work_finish_signal_.notify_all();
}
template<typename EngineT>
Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::DateT& date,
const meta::GroupFilesSchema& files) {
meta::GroupFileSchema group_file;
group_file.group_id = group_id;
group_file.date = date;
Status status = _pMeta->add_group_file(group_file);
Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::DateT& date,
const meta::TableFilesSchema& files) {
meta::TableFileSchema table_file;
table_file.table_id = table_id;
table_file.date = date;
Status status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
LOG(INFO) << status.ToString() << std::endl;
return status;
}
EngineT index(group_file.dimension, group_file.location);
EngineT index(table_file.dimension, table_file.location);
meta::GroupFilesSchema updated;
meta::TableFilesSchema updated;
long index_size = 0;
for (auto& file : files) {
index.Merge(file.location);
auto file_schema = file;
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
file_schema.file_type = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
LOG(DEBUG) << "Merging file " << file_schema.file_id;
index_size = index.Size();
if (index_size >= _options.index_trigger_size) break;
if (index_size >= options_.index_trigger_size) break;
}
index.Serialize();
if (index_size >= _options.index_trigger_size) {
group_file.file_type = meta::GroupFileSchema::TO_INDEX;
if (index_size >= options_.index_trigger_size) {
table_file.file_type = meta::TableFileSchema::TO_INDEX;
} else {
group_file.file_type = meta::GroupFileSchema::RAW;
table_file.file_type = meta::TableFileSchema::RAW;
}
group_file.size = index_size;
updated.push_back(group_file);
status = _pMeta->update_files(updated);
LOG(DEBUG) << "New merged file " << group_file.file_id <<
table_file.size = index_size;
updated.push_back(table_file);
status = pMeta_->UpdateTableFiles(updated);
LOG(DEBUG) << "New merged file " << table_file.file_id <<
" of size=" << index.PhysicalSize()/(1024*1024) << " M";
index.Cache();
@ -304,43 +289,39 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
}
template<typename EngineT>
Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
meta::DatePartionedGroupFilesSchema raw_files;
auto status = _pMeta->files_to_merge(group_id, raw_files);
Status DBImpl<EngineT>::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = pMeta_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
return status;
}
/* if (raw_files.size() == 0) { */
/* return Status::OK(); */
/* } */
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() <= _options.merge_trigger_number) {
if (files.size() <= options_.merge_trigger_number) {
continue;
}
has_merge = true;
merge_files(group_id, kv.first, kv.second);
MergeFiles(table_id, kv.first, kv.second);
}
_pMeta->archive_files();
pMeta_->Archive();
try_build_index();
TryBuildIndex();
_pMeta->cleanup_ttl_files(1);
pMeta_->CleanUpFilesWithTTL(1);
return Status::OK();
}
template<typename EngineT>
Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
meta::GroupFileSchema group_file;
group_file.group_id = file.group_id;
group_file.date = file.date;
Status status = _pMeta->add_group_file(group_file);
Status DBImpl<EngineT>::BuildIndex(const meta::TableFileSchema& file) {
meta::TableFileSchema table_file;
table_file.table_id = file.table_id;
table_file.date = file.date;
Status status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
return status;
}
@ -348,39 +329,39 @@ Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
EngineT to_index(file.dimension, file.location);
to_index.Load();
auto index = to_index.BuildIndex(group_file.location);
auto index = to_index.BuildIndex(table_file.location);
group_file.file_type = meta::GroupFileSchema::INDEX;
group_file.size = index->Size();
table_file.file_type = meta::TableFileSchema::INDEX;
table_file.size = index->Size();
auto to_remove = file;
to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
to_remove.file_type = meta::TableFileSchema::TO_DELETE;
meta::GroupFilesSchema update_files = {to_remove, group_file};
_pMeta->update_files(update_files);
meta::TableFilesSchema update_files = {to_remove, table_file};
pMeta_->UpdateTableFiles(update_files);
LOG(DEBUG) << "New index file " << group_file.file_id << " of size "
LOG(DEBUG) << "New index file " << table_file.file_id << " of size "
<< index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id;
index->Cache();
_pMeta->archive_files();
pMeta_->Archive();
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::background_build_index() {
void DBImpl<EngineT>::BackgroundBuildIndex() {
std::lock_guard<std::mutex> lock(build_index_mutex_);
assert(bg_build_index_started_);
meta::GroupFilesSchema to_index_files;
_pMeta->files_to_index(to_index_files);
meta::TableFilesSchema to_index_files;
pMeta_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
/* LOG(DEBUG) << "Buiding index for " << file.location; */
status = build_index(file);
status = BuildIndex(file);
if (!status.ok()) {
_bg_error = status;
bg_error_ = status;
return;
}
}
@ -391,52 +372,47 @@ void DBImpl<EngineT>::background_build_index() {
}
template<typename EngineT>
Status DBImpl<EngineT>::try_build_index() {
Status DBImpl<EngineT>::TryBuildIndex() {
if (bg_build_index_started_) return Status::OK();
if (_shutting_down.load(std::memory_order_acquire)) return Status::OK();
if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
bg_build_index_started_ = true;
std::thread build_index_task(&DBImpl<EngineT>::background_build_index, this);
std::thread build_index_task(&DBImpl<EngineT>::BackgroundBuildIndex, this);
build_index_task.detach();
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::background_compaction() {
std::vector<std::string> group_ids;
_pMemMgr->serialize(group_ids);
void DBImpl<EngineT>::BackgroundCompaction() {
std::vector<std::string> table_ids;
pMemMgr_->Serialize(table_ids);
Status status;
for (auto group_id : group_ids) {
status = background_merge_files(group_id);
for (auto table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
_bg_error = status;
bg_error_ = status;
return;
}
}
}
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
return _pMeta->drop_all();
Status DBImpl<EngineT>::DropAll() {
return pMeta_->DropAll();
}
template<typename EngineT>
Status DBImpl<EngineT>::count(const std::string& group_id, long& result) {
return _pMeta->count(group_id, result);
}
template<typename EngineT>
Status DBImpl<EngineT>::size(long& result) {
return _pMeta->size(result);
Status DBImpl<EngineT>::Size(long& result) {
return pMeta_->Size(result);
}
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
{
std::unique_lock<std::mutex> lock(_mutex);
_shutting_down.store(true, std::memory_order_release);
while (_bg_compaction_scheduled) {
_bg_work_finish_signal.wait(lock);
std::unique_lock<std::mutex> lock(mutex_);
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_) {
bg_work_finish_signal_.wait(lock);
}
}
{
@ -447,12 +423,10 @@ DBImpl<EngineT>::~DBImpl() {
}
bg_timer_thread_.join();
std::vector<std::string> ids;
_pMemMgr->serialize(ids);
_env->Stop();
pMemMgr_->Serialize(ids);
env_->Stop();
}
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif

View File

@ -3,6 +3,10 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
#include "MetaConsts.h"
#include <unistd.h>
#include <sstream>
@ -13,11 +17,6 @@
#include <sqlite_orm.h>
#include <easylogging++.h>
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "Utils.h"
#include "MetaConsts.h"
namespace zilliz {
namespace vecwise {
namespace engine {
@ -27,21 +26,21 @@ using namespace sqlite_orm;
inline auto StoragePrototype(const std::string& path) {
return make_storage(path,
make_table("Group",
make_column("id", &GroupSchema::id, primary_key()),
make_column("group_id", &GroupSchema::group_id, unique()),
make_column("dimension", &GroupSchema::dimension),
make_column("created_on", &GroupSchema::created_on),
make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))),
make_table("GroupFile",
make_column("id", &GroupFileSchema::id, primary_key()),
make_column("group_id", &GroupFileSchema::group_id),
make_column("file_id", &GroupFileSchema::file_id),
make_column("file_type", &GroupFileSchema::file_type),
make_column("size", &GroupFileSchema::size, default_value(0)),
make_column("updated_time", &GroupFileSchema::updated_time),
make_column("created_on", &GroupFileSchema::created_on),
make_column("date", &GroupFileSchema::date))
make_table("Table",
make_column("id", &TableSchema::id, primary_key()),
make_column("table_id", &TableSchema::table_id, unique()),
make_column("dimension", &TableSchema::dimension),
make_column("created_on", &TableSchema::created_on),
make_column("files_cnt", &TableSchema::files_cnt, default_value(0))),
make_table("TableFile",
make_column("id", &TableFileSchema::id, primary_key()),
make_column("table_id", &TableFileSchema::table_id),
make_column("file_id", &TableFileSchema::file_id),
make_column("file_type", &TableFileSchema::file_type),
make_column("size", &TableFileSchema::size, default_value(0)),
make_column("updated_time", &TableFileSchema::updated_time),
make_column("created_on", &TableFileSchema::created_on),
make_column("date", &TableFileSchema::date))
);
}
@ -49,77 +48,77 @@ inline auto StoragePrototype(const std::string& path) {
using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
std::string DBMetaImpl::GetGroupPath(const std::string& group_id) {
return _options.path + "/tables/" + group_id;
std::string DBMetaImpl::GetTablePath(const std::string& table_id) {
return options_.path + "/tables/" + table_id;
}
std::string DBMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string& table_id, DateT& date) {
std::stringstream ss;
ss << GetGroupPath(group_id) << "/" << date;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void DBMetaImpl::GetGroupFilePath(GroupFileSchema& group_file) {
void DBMetaImpl::GetTableFilePath(TableFileSchema& group_file) {
if (group_file.date == EmptyDate) {
group_file.date = Meta::GetDate();
}
std::stringstream ss;
ss << GetGroupDatePartitionPath(group_file.group_id, group_file.date)
ss << GetTableDatePartitionPath(group_file.table_id, group_file.date)
<< "/" << group_file.file_id;
group_file.location = ss.str();
}
Status DBMetaImpl::NextGroupId(std::string& group_id) {
Status DBMetaImpl::NextTableId(std::string& table_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.getNextIDNumber();
group_id = ss.str();
ss << g.GetNextIDNumber();
table_id = ss.str();
return Status::OK();
}
Status DBMetaImpl::NextFileId(std::string& file_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.getNextIDNumber();
ss << g.GetNextIDNumber();
file_id = ss.str();
return Status::OK();
}
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
: _options(options_) {
initialize();
: options_(options_) {
Initialize();
}
Status DBMetaImpl::initialize() {
if (!boost::filesystem::is_directory(_options.path)) {
auto ret = boost::filesystem::create_directory(_options.path);
Status DBMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
if (!ret) {
LOG(ERROR) << "Create directory " << _options.path << " Error";
LOG(ERROR) << "Create directory " << options_.path << " Error";
}
assert(ret);
}
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(_options.path+"/meta.sqlite"));
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path+"/meta.sqlite"));
ConnectorPtr->sync_schema();
ConnectorPtr->open_forever(); // thread safe option
ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
cleanup();
CleanUp();
return Status::OK();
}
// PXU TODO: Temp solution. Will fix later
Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) {
Status DBMetaImpl::DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) {
if (dates.size() == 0) {
return Status::OK();
}
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group(group_info);
TableSchema table_schema;
table_schema.table_id = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
@ -135,11 +134,11 @@ Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
try {
ConnectorPtr->update_all(
set(
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
),
where(
c(&GroupFileSchema::group_id) == group_id and
in(&GroupFileSchema::date, dates)
c(&TableFileSchema::table_id) == table_id and
in(&TableFileSchema::date, dates)
));
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -148,24 +147,24 @@ Status DBMetaImpl::delete_group_partitions(const std::string& group_id,
return Status::OK();
}
Status DBMetaImpl::add_group(GroupSchema& group_info) {
if (group_info.group_id == "") {
NextGroupId(group_info.group_id);
Status DBMetaImpl::CreateTable(TableSchema& table_schema) {
if (table_schema.table_id == "") {
NextTableId(table_schema.table_id);
}
group_info.files_cnt = 0;
group_info.id = -1;
group_info.created_on = utils::GetMicroSecTimeStamp();
table_schema.files_cnt = 0;
table_schema.id = -1;
table_schema.created_on = utils::GetMicroSecTimeStamp();
{
try {
auto id = ConnectorPtr->insert(group_info);
group_info.id = id;
auto id = ConnectorPtr->insert(table_schema);
table_schema.id = id;
} catch (...) {
return Status::DBTransactionError("Add Group Error");
return Status::DBTransactionError("Add Table Error");
}
}
auto group_path = GetGroupPath(group_info.group_id);
auto group_path = GetTablePath(table_schema.table_id);
if (!boost::filesystem::is_directory(group_path)) {
auto ret = boost::filesystem::create_directories(group_path);
@ -178,24 +177,20 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
return Status::OK();
}
Status DBMetaImpl::get_group(GroupSchema& group_info) {
return get_group_no_lock(group_info);
}
Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
Status DBMetaImpl::DescribeTable(TableSchema& table_schema) {
try {
auto groups = ConnectorPtr->select(columns(&GroupSchema::id,
&GroupSchema::group_id,
&GroupSchema::files_cnt,
&GroupSchema::dimension),
where(c(&GroupSchema::group_id) == group_info.group_id));
auto groups = ConnectorPtr->select(columns(&TableSchema::id,
&TableSchema::table_id,
&TableSchema::files_cnt,
&TableSchema::dimension),
where(c(&TableSchema::table_id) == table_schema.table_id));
assert(groups.size() <= 1);
if (groups.size() == 1) {
group_info.id = std::get<0>(groups[0]);
group_info.files_cnt = std::get<2>(groups[0]);
group_info.dimension = std::get<3>(groups[0]);
table_schema.id = std::get<0>(groups[0]);
table_schema.files_cnt = std::get<2>(groups[0]);
table_schema.dimension = std::get<3>(groups[0]);
} else {
return Status::NotFound("Group " + group_info.group_id + " not found");
return Status::NotFound("Table " + table_schema.table_id + " not found");
}
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
@ -205,12 +200,12 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) {
return Status::OK();
}
Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
Status DBMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
try {
auto groups = ConnectorPtr->select(columns(&GroupSchema::id),
where(c(&GroupSchema::group_id) == group_id));
assert(groups.size() <= 1);
if (groups.size() == 1) {
auto tables = ConnectorPtr->select(columns(&TableSchema::id),
where(c(&TableSchema::table_id) == table_id));
assert(tables.size() <= 1);
if (tables.size() == 1) {
has_or_not = true;
} else {
has_or_not = false;
@ -222,35 +217,35 @@ Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
return Status::OK();
}
Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
if (group_file.date == EmptyDate) {
group_file.date = Meta::GetDate();
Status DBMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
if (file_schema.date == EmptyDate) {
file_schema.date = Meta::GetDate();
}
GroupSchema group_info;
group_info.group_id = group_file.group_id;
auto status = get_group(group_info);
TableSchema table_schema;
table_schema.table_id = file_schema.table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
NextFileId(group_file.file_id);
group_file.file_type = GroupFileSchema::NEW;
group_file.dimension = group_info.dimension;
group_file.size = 0;
group_file.created_on = utils::GetMicroSecTimeStamp();
group_file.updated_time = group_file.created_on;
GetGroupFilePath(group_file);
NextFileId(file_schema.file_id);
file_schema.file_type = TableFileSchema::NEW;
file_schema.dimension = table_schema.dimension;
file_schema.size = 0;
file_schema.created_on = utils::GetMicroSecTimeStamp();
file_schema.updated_time = file_schema.created_on;
GetTableFilePath(file_schema);
{
try {
auto id = ConnectorPtr->insert(group_file);
group_file.id = id;
auto id = ConnectorPtr->insert(file_schema);
file_schema.id = id;
} catch (...) {
return Status::DBTransactionError("Add file Error");
}
}
auto partition_path = GetGroupDatePartitionPath(group_file.group_id, group_file.date);
auto partition_path = GetTableDatePartitionPath(file_schema.table_id, file_schema.date);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
@ -263,41 +258,41 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
return Status::OK();
}
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
Status DBMetaImpl::FilesToIndex(TableFilesSchema& files) {
files.clear();
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX));
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX));
std::map<std::string, GroupSchema> groups;
std::map<std::string, TableSchema> groups;
TableFileSchema table_file;
for (auto& file : selected) {
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
GetGroupFilePath(group_file);
auto groupItr = groups.find(group_file.group_id);
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
table_file.file_type = std::get<3>(file);
table_file.size = std::get<4>(file);
table_file.date = std::get<5>(file);
GetTableFilePath(table_file);
auto groupItr = groups.find(table_file.table_id);
if (groupItr == groups.end()) {
GroupSchema group_info;
group_info.group_id = group_file.group_id;
auto status = get_group_no_lock(group_info);
TableSchema table_schema;
table_schema.table_id = table_file.table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
groups[group_file.group_id] = group_info;
groups[table_file.table_id] = table_schema;
}
group_file.dimension = groups[group_file.group_id].dimension;
files.push_back(group_file);
table_file.dimension = groups[table_file.table_id].dimension;
files.push_back(table_file);
}
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -307,48 +302,49 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
return Status::OK();
}
Status DBMetaImpl::files_to_search(const std::string &group_id,
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT& partition,
DatePartionedGroupFilesSchema &files) {
DatePartionedTableFilesSchema &files) {
files.clear();
DatesT today = {Meta::GetDate()};
const DatesT& dates = (partition.empty() == true) ? today : partition;
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::group_id) == group_id and
in(&GroupFileSchema::date, dates) and
(c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or
c(&GroupFileSchema::file_type) == (int) GroupFileSchema::TO_INDEX or
c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX)));
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::table_id) == table_id and
in(&TableFileSchema::date, dates) and
(c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_INDEX or
c(&TableFileSchema::file_type) == (int) TableFileSchema::INDEX)));
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
TableSchema table_schema;
table_schema.table_id = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto& file : selected) {
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
group_file.dimension = group_info.dimension;
GetGroupFilePath(group_file);
auto dateItr = files.find(group_file.date);
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
table_file.file_type = std::get<3>(file);
table_file.size = std::get<4>(file);
table_file.date = std::get<5>(file);
table_file.dimension = table_schema.dimension;
GetTableFilePath(table_file);
auto dateItr = files.find(table_file.date);
if (dateItr == files.end()) {
files[group_file.date] = GroupFilesSchema();
files[table_file.date] = TableFilesSchema();
}
files[group_file.date].push_back(group_file);
files[table_file.date].push_back(table_file);
}
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -358,42 +354,42 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
return Status::OK();
}
Status DBMetaImpl::files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) {
Status DBMetaImpl::FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) {
files.clear();
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and
c(&GroupFileSchema::group_id) == group_id));
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW and
c(&TableFileSchema::table_id) == table_id));
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
TableSchema table_schema;
table_schema.table_id = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto& file : selected) {
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
group_file.dimension = group_info.dimension;
GetGroupFilePath(group_file);
auto dateItr = files.find(group_file.date);
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
table_file.file_type = std::get<3>(file);
table_file.size = std::get<4>(file);
table_file.date = std::get<5>(file);
table_file.dimension = table_schema.dimension;
GetTableFilePath(table_file);
auto dateItr = files.find(table_file.date);
if (dateItr == files.end()) {
files[group_file.date] = GroupFilesSchema();
files[table_file.date] = TableFilesSchema();
}
files[group_file.date].push_back(group_file);
files[table_file.date].push_back(table_file);
}
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -403,36 +399,29 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
return Status::OK();
}
Status DBMetaImpl::has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) {
//PXU TODO
return Status::OK();
}
Status DBMetaImpl::GetTableFile(TableFileSchema& file_schema) {
Status DBMetaImpl::get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) {
try {
auto files = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_id) == file_id_ and
c(&GroupFileSchema::group_id) == group_id_
auto files = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_id) == file_schema.file_id and
c(&TableFileSchema::table_id) == file_schema.table_id
));
assert(files.size() <= 1);
if (files.size() == 1) {
group_file_info_.id = std::get<0>(files[0]);
group_file_info_.group_id = std::get<1>(files[0]);
group_file_info_.file_id = std::get<2>(files[0]);
group_file_info_.file_type = std::get<3>(files[0]);
group_file_info_.size = std::get<4>(files[0]);
group_file_info_.date = std::get<5>(files[0]);
file_schema.id = std::get<0>(files[0]);
file_schema.table_id = std::get<1>(files[0]);
file_schema.file_id = std::get<2>(files[0]);
file_schema.file_type = std::get<3>(files[0]);
file_schema.size = std::get<4>(files[0]);
file_schema.date = std::get<5>(files[0]);
} else {
return Status::NotFound("GroupFile " + file_id_ + " not found");
return Status::NotFound("Table:" + file_schema.table_id +
" File:" + file_schema.file_id + " not found");
}
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
@ -442,16 +431,9 @@ Status DBMetaImpl::get_group_file(const std::string& group_id_,
return Status::OK();
}
Status DBMetaImpl::get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) {
// PXU TODO
return Status::OK();
}
// PXU TODO: Support Swap
Status DBMetaImpl::archive_files() {
auto& criterias = _options.archive_conf.GetCriterias();
Status DBMetaImpl::Archive() {
auto& criterias = options_.archive_conf.GetCriterias();
if (criterias.size() == 0) {
return Status::OK();
}
@ -466,11 +448,11 @@ Status DBMetaImpl::archive_files() {
{
ConnectorPtr->update_all(
set(
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
),
where(
c(&GroupFileSchema::created_on) < (long)(now - usecs) and
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
c(&TableFileSchema::created_on) < (long)(now - usecs) and
c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
));
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -479,23 +461,22 @@ Status DBMetaImpl::archive_files() {
}
if (criteria == "disk") {
long sum = 0;
size(sum);
Size(sum);
// PXU TODO: refactor size
auto to_delete = (sum - limit*G);
discard_files_of_size(to_delete);
DiscardFiles(to_delete);
}
}
return Status::OK();
}
Status DBMetaImpl::size(long& result) {
Status DBMetaImpl::Size(long& result) {
result = 0;
try {
auto selected = ConnectorPtr->select(columns(sum(&GroupFileSchema::size)),
auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size)),
where(
c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE
c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
));
for (auto& sub_query : selected) {
@ -512,27 +493,28 @@ Status DBMetaImpl::size(long& result) {
return Status::OK();
}
Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
LOG(DEBUG) << "Abort to discard size=" << to_discard_size;
Status DBMetaImpl::DiscardFiles(long to_discard_size) {
LOG(DEBUG) << "About to discard size=" << to_discard_size;
if (to_discard_size <= 0) {
return Status::OK();
}
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::size),
where(c(&GroupFileSchema::file_type) != (int)GroupFileSchema::TO_DELETE),
order_by(&GroupFileSchema::id),
limit(10));
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::size),
where(c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE),
order_by(&TableFileSchema::id),
limit(10));
std::vector<int> ids;
TableFileSchema table_file;
for (auto& file : selected) {
if (to_discard_size <= 0) break;
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.size = std::get<1>(file);
ids.push_back(group_file.id);
LOG(DEBUG) << "Discard group_file.id=" << group_file.id << " group_file.size=" << group_file.size;
to_discard_size -= group_file.size;
table_file.id = std::get<0>(file);
table_file.size = std::get<1>(file);
ids.push_back(table_file.id);
LOG(DEBUG) << "Discard table_file.id=" << table_file.file_id << " table_file.size=" << table_file.size;
to_discard_size -= table_file.size;
}
if (ids.size() == 0) {
@ -541,10 +523,10 @@ Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
ConnectorPtr->update_all(
set(
c(&GroupFileSchema::file_type) = (int)GroupFileSchema::TO_DELETE
c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
),
where(
in(&GroupFileSchema::id, ids)
in(&TableFileSchema::id, ids)
));
} catch (std::exception & e) {
@ -553,22 +535,22 @@ Status DBMetaImpl::discard_files_of_size(long to_discard_size) {
}
return discard_files_of_size(to_discard_size);
return DiscardFiles(to_discard_size);
}
Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) {
group_file.updated_time = utils::GetMicroSecTimeStamp();
Status DBMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
file_schema.updated_time = utils::GetMicroSecTimeStamp();
try {
ConnectorPtr->update(group_file);
ConnectorPtr->update(file_schema);
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id;
LOG(DEBUG) << "table_id= " << file_schema.table_id << " file_id=" << file_schema.file_id;
throw e;
}
return Status::OK();
}
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
try {
auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
@ -587,34 +569,34 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) {
return Status::OK();
}
Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE and
c(&GroupFileSchema::updated_time) > now - seconds*US_PS));
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE and
c(&TableFileSchema::updated_time) > now - seconds*US_PS));
GroupFilesSchema updated;
TableFilesSchema updated;
TableFileSchema table_file;
for (auto& file : selected) {
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
GetGroupFilePath(group_file);
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
boost::filesystem::remove(group_file.location);
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
table_file.file_type = std::get<3>(file);
table_file.size = std::get<4>(file);
table_file.date = std::get<5>(file);
GetTableFilePath(table_file);
if (table_file.file_type == TableFileSchema::TO_DELETE) {
boost::filesystem::remove(table_file.location);
}
ConnectorPtr->remove<GroupFileSchema>(group_file.id);
/* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */
ConnectorPtr->remove<TableFileSchema>(table_file.id);
/* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -624,33 +606,33 @@ Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
return Status::OK();
}
Status DBMetaImpl::cleanup() {
Status DBMetaImpl::CleanUp() {
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id,
&GroupFileSchema::group_id,
&GroupFileSchema::file_id,
&GroupFileSchema::file_type,
&GroupFileSchema::size,
&GroupFileSchema::date),
where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_DELETE or
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::NEW));
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE or
c(&TableFileSchema::file_type) == (int)TableFileSchema::NEW));
GroupFilesSchema updated;
TableFilesSchema updated;
TableFileSchema table_file;
for (auto& file : selected) {
GroupFileSchema group_file;
group_file.id = std::get<0>(file);
group_file.group_id = std::get<1>(file);
group_file.file_id = std::get<2>(file);
group_file.file_type = std::get<3>(file);
group_file.size = std::get<4>(file);
group_file.date = std::get<5>(file);
GetGroupFilePath(group_file);
if (group_file.file_type == GroupFileSchema::TO_DELETE) {
boost::filesystem::remove(group_file.location);
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
table_file.file_type = std::get<3>(file);
table_file.size = std::get<4>(file);
table_file.date = std::get<5>(file);
GetTableFilePath(table_file);
if (table_file.file_type == TableFileSchema::TO_DELETE) {
boost::filesystem::remove(table_file.location);
}
ConnectorPtr->remove<GroupFileSchema>(group_file.id);
/* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */
ConnectorPtr->remove<TableFileSchema>(table_file.id);
/* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -660,19 +642,19 @@ Status DBMetaImpl::cleanup() {
return Status::OK();
}
Status DBMetaImpl::count(const std::string& group_id, long& result) {
Status DBMetaImpl::Count(const std::string& table_id, long& result) {
try {
auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size,
&GroupFileSchema::date),
where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or
c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and
c(&GroupFileSchema::group_id) == group_id));
auto selected = ConnectorPtr->select(columns(&TableFileSchema::size,
&TableFileSchema::date),
where((c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW or
c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX or
c(&TableFileSchema::file_type) == (int)TableFileSchema::INDEX) and
c(&TableFileSchema::table_id) == table_id));
GroupSchema group_info;
group_info.group_id = group_id;
auto status = get_group_no_lock(group_info);
TableSchema table_schema;
table_schema.table_id = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
@ -682,7 +664,7 @@ Status DBMetaImpl::count(const std::string& group_id, long& result) {
result += std::get<0>(file);
}
result /= group_info.dimension;
result /= table_schema.dimension;
} catch (std::exception & e) {
LOG(DEBUG) << e.what();
@ -691,15 +673,15 @@ Status DBMetaImpl::count(const std::string& group_id, long& result) {
return Status::OK();
}
Status DBMetaImpl::drop_all() {
if (boost::filesystem::is_directory(_options.path)) {
boost::filesystem::remove_all(_options.path);
Status DBMetaImpl::DropAll() {
if (boost::filesystem::is_directory(options_.path)) {
boost::filesystem::remove_all(options_.path);
}
return Status::OK();
}
DBMetaImpl::~DBMetaImpl() {
cleanup();
CleanUp();
}
} // namespace meta

View File

@ -19,62 +19,53 @@ class DBMetaImpl : public Meta {
public:
DBMetaImpl(const DBMetaOptions& options_);
virtual Status add_group(GroupSchema& group_info) override;
virtual Status get_group(GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status CreateTable(TableSchema& table_schema) override;
virtual Status DescribeTable(TableSchema& group_info_) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
virtual Status delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) override;
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) override;
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) override;
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) override;
virtual Status update_group_file(GroupFileSchema& group_file_) override;
virtual Status GetTableFile(TableFileSchema& file_schema) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
virtual Status update_files(GroupFilesSchema& files) override;
virtual Status UpdateTableFiles(TableFilesSchema& files) override;
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) override;
virtual Status files_to_search(const std::string& group_id,
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedGroupFilesSchema& files) override;
DatePartionedTableFilesSchema& files) override;
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) override;
virtual Status archive_files() override;
virtual Status FilesToIndex(TableFilesSchema&) override;
virtual Status size(long& result) override;
virtual Status Archive() override;
virtual Status cleanup() override;
virtual Status Size(long& result) override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
virtual Status CleanUp() override;
virtual Status drop_all() override;
virtual Status CleanUpFilesWithTTL(uint16_t seconds) override;
virtual Status count(const std::string& group_id, long& result) override;
virtual Status DropAll() override;
virtual Status Count(const std::string& table_id, long& result) override;
virtual ~DBMetaImpl();
private:
Status NextFileId(std::string& file_id);
Status NextGroupId(std::string& group_id);
Status discard_files_of_size(long to_discard_size);
Status get_group_no_lock(GroupSchema& group_info);
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
void GetGroupFilePath(GroupFileSchema& group_file);
Status initialize();
Status NextTableId(std::string& table_id);
Status DiscardFiles(long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize();
const DBMetaOptions _options;
const DBMetaOptions options_;
}; // DBMetaImpl
} // namespace meta

View File

@ -13,66 +13,66 @@ namespace vecwise {
namespace engine {
Env::Env()
: _bg_work_started(false),
_shutting_down(false) {
: bg_work_started_(false),
shutting_down_(false) {
}
void Env::schedule(void (*function_)(void* arg_), void* arg_) {
std::unique_lock<std::mutex> lock(_bg_work_mutex);
if (_shutting_down) return;
void Env::Schedule(void (*function)(void* arg), void* arg) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_) return;
if (!_bg_work_started) {
_bg_work_started = true;
if (!bg_work_started_) {
bg_work_started_ = true;
std::thread bg_thread(Env::BackgroundThreadEntryPoint, this);
bg_thread.detach();
}
if (_bg_work_queue.empty()) {
_bg_work_cv.notify_one();
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
_bg_work_queue.emplace(function_, arg_);
bg_work_queue_.emplace(function, arg);
}
void Env::backgroud_thread_main() {
while (!_shutting_down) {
std::unique_lock<std::mutex> lock(_bg_work_mutex);
while (_bg_work_queue.empty() && !_shutting_down) {
_bg_work_cv.wait(lock);
void Env::BackgroundThreadMain() {
while (!shutting_down_) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
while (bg_work_queue_.empty() && !shutting_down_) {
bg_work_cv_.wait(lock);
}
if (_shutting_down) break;
if (shutting_down_) break;
assert(!_bg_work_queue.empty());
auto bg_function = _bg_work_queue.front()._function;
void* bg_arg = _bg_work_queue.front()._arg;
_bg_work_queue.pop();
assert(!bg_work_queue_.empty());
auto bg_function = bg_work_queue_.front().function_;
void* bg_arg = bg_work_queue_.front().arg_;
bg_work_queue_.pop();
lock.unlock();
bg_function(bg_arg);
}
std::unique_lock<std::mutex> lock(_bg_work_mutex);
_bg_work_started = false;
_bg_work_cv.notify_all();
std::unique_lock<std::mutex> lock(bg_work_mutex_);
bg_work_started_ = false;
bg_work_cv_.notify_all();
}
void Env::Stop() {
{
std::unique_lock<std::mutex> lock(_bg_work_mutex);
if (_shutting_down || !_bg_work_started) return;
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_ || !bg_work_started_) return;
}
_shutting_down = true;
shutting_down_ = true;
{
std::unique_lock<std::mutex> lock(_bg_work_mutex);
if (_bg_work_queue.empty()) {
_bg_work_cv.notify_one();
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
while (_bg_work_started) {
_bg_work_cv.wait(lock);
while (bg_work_started_) {
bg_work_cv_.wait(lock);
}
}
_shutting_down = false;
shutting_down_ = false;
}
Env::~Env() {}

View File

@ -22,7 +22,7 @@ public:
Env(const Env&) = delete;
Env& operator=(const Env&) = delete;
void schedule(void (*function_)(void* arg_), void* arg_);
void Schedule(void (*function)(void* arg), void* arg);
virtual void Stop();
@ -31,25 +31,24 @@ public:
static Env* Default();
protected:
void backgroud_thread_main();
void BackgroundThreadMain();
static void BackgroundThreadEntryPoint(Env* env) {
env->backgroud_thread_main();
env->BackgroundThreadMain();
}
struct BGWork {
explicit BGWork(void (*function_)(void*), void* arg_)
: _function(function_), _arg(arg_) {}
explicit BGWork(void (*function)(void*), void* arg)
: function_(function), arg_(arg) {}
void (* const _function)(void*);
void* const _arg;
void (* const function_)(void*);
void* const arg_;
};
std::mutex _bg_work_mutex;
std::condition_variable _bg_work_cv;
std::queue<BGWork> _bg_work_queue;
bool _bg_work_started;
std::atomic<bool> _shutting_down;
std::mutex bg_work_mutex_;
std::condition_variable bg_work_cv_;
std::queue<BGWork> bg_work_queue_;
bool bg_work_started_;
std::atomic<bool> shutting_down_;
}; // Env
} // namespace engine

View File

@ -3,9 +3,10 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <easylogging++.h>
#include "ExecutionEngine.h"
#include <easylogging++.h>
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -5,11 +5,11 @@
******************************************************************************/
#pragma once
#include "Status.h"
#include <vector>
#include <memory>
#include "Status.h"
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -3,6 +3,11 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "Factories.h"
#include "DBImpl.h"
#include "FaissExecutionEngine.h"
#include "Traits.h"
#include <stdlib.h>
#include <time.h>
#include <sstream>
@ -11,12 +16,6 @@
#include <assert.h>
#include <easylogging++.h>
#include "Factories.h"
#include "DBImpl.h"
#include "FaissExecutionEngine.h"
#include "Traits.h"
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -3,15 +3,15 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <string>
#include <memory>
#include "DB.h"
#include "DBMetaImpl.h"
#include "Options.h"
#include <string>
#include <memory>
namespace zilliz {
namespace vecwise {
namespace engine {

View File

@ -5,11 +5,11 @@
******************************************************************************/
#pragma once
#include "ExecutionEngine.h"
#include <memory>
#include <string>
#include "ExecutionEngine.h"
namespace faiss {
class Index;
}
@ -22,7 +22,7 @@ namespace engine {
template<class IndexTrait>
class FaissExecutionEngine : public ExecutionEngine<FaissExecutionEngine<IndexTrait>> {
public:
typedef std::shared_ptr<FaissExecutionEngine<IndexTrait>> Ptr;
using Ptr = std::shared_ptr<FaissExecutionEngine<IndexTrait>>;
FaissExecutionEngine(uint16_t dimension, const std::string& location);
FaissExecutionEngine(std::shared_ptr<faiss::Index> index, const std::string& location);
@ -53,7 +53,9 @@ public:
Ptr BuildIndex(const std::string&);
Status Cache();
protected:
std::shared_ptr<faiss::Index> pIndex_;
std::string location_;
};
@ -63,4 +65,4 @@ protected:
} // namespace vecwise
} // namespace zilliz
#include "FaissExecutionEngine.cpp"
#include "FaissExecutionEngine.inl"

View File

@ -3,8 +3,9 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#ifndef FAISSEXECUTIONENGINE_CPP__
#define FAISSEXECUTIONENGINE_CPP__
#pragma once
#include "FaissExecutionEngine.h"
#include <easylogging++.h>
#include <faiss/AutoTune.h>
@ -15,8 +16,6 @@
#include <wrapper/IndexBuilder.h>
#include <cache/CpuCacheMgr.h>
#include "FaissExecutionEngine.h"
namespace zilliz {
namespace vecwise {
namespace engine {
@ -135,5 +134,3 @@ Status FaissExecutionEngine<IndexTrait>::Cache() {
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif

View File

@ -3,30 +3,29 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "IDGenerator.h"
#include <chrono>
#include <assert.h>
#include <iostream>
#include "IDGenerator.h"
namespace zilliz {
namespace vecwise {
namespace engine {
IDGenerator::~IDGenerator() {}
IDNumber SimpleIDGenerator::getNextIDNumber() {
IDNumber SimpleIDGenerator::GetNextIDNumber() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
return micros * MAX_IDS_PER_MICRO;
}
void SimpleIDGenerator::nextIDNumbers(size_t n, IDNumbers& ids) {
void SimpleIDGenerator::NextIDNumbers(size_t n, IDNumbers& ids) {
if (n > MAX_IDS_PER_MICRO) {
nextIDNumbers(n-MAX_IDS_PER_MICRO, ids);
nextIDNumbers(MAX_IDS_PER_MICRO, ids);
NextIDNumbers(n-MAX_IDS_PER_MICRO, ids);
NextIDNumbers(MAX_IDS_PER_MICRO, ids);
return;
}
if (n <= 0) {
@ -41,12 +40,11 @@ void SimpleIDGenerator::nextIDNumbers(size_t n, IDNumbers& ids) {
for (int pos=0; pos<n; ++pos) {
ids.push_back(micros+pos);
}
}
void SimpleIDGenerator::getNextIDNumbers(size_t n, IDNumbers& ids) {
void SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) {
ids.clear();
nextIDNumbers(n, ids);
NextIDNumbers(n, ids);
}

View File

@ -5,17 +5,19 @@
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <vector>
#include "Types.h"
#include <cstddef>
#include <vector>
namespace zilliz {
namespace vecwise {
namespace engine {
class IDGenerator {
public:
virtual IDNumber getNextIDNumber() = 0;
virtual void getNextIDNumbers(size_t n, IDNumbers& ids) = 0;
virtual IDNumber GetNextIDNumber() = 0;
virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) = 0;
virtual ~IDGenerator();
@ -24,11 +26,11 @@ public:
class SimpleIDGenerator : public IDGenerator {
public:
virtual IDNumber getNextIDNumber() override;
virtual void getNextIDNumbers(size_t n, IDNumbers& ids) override;
virtual IDNumber GetNextIDNumber() override;
virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) override;
private:
void nextIDNumbers(size_t n, IDNumbers& ids);
void NextIDNumbers(size_t n, IDNumbers& ids);
const size_t MAX_IDS_PER_MICRO = 1000;
}; // SimpleIDGenerator

View File

@ -1,277 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <sys/stat.h>
#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <fstream>
#include "LocalMetaImpl.h"
#include "IDGenerator.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
long LocalMetaImpl::GetFileSize(const std::string& filename)
{
struct stat stat_buf;
int rc = stat(filename.c_str(), &stat_buf);
return rc == 0 ? stat_buf.st_size : -1;
}
std::string LocalMetaImpl::GetGroupPath(const std::string& group_id) {
return _options.path + "/" + group_id;
}
std::string LocalMetaImpl::GetGroupDatePartitionPath(const std::string& group_id, DateT& date) {
std::stringstream ss;
ss << GetGroupPath(group_id) << "/" << date;
return ss.str();
}
std::string LocalMetaImpl::GetNextGroupFileLocationByPartition(const std::string& group_id, DateT& date,
GroupFileSchema::FILE_TYPE file_type) {
std::string suffix = (file_type == GroupFileSchema::RAW) ? ".raw" : ".index";
SimpleIDGenerator g;
std::stringstream ss;
ss << GetGroupPath(group_id) << "/" << date << "/" << g.getNextIDNumber() << suffix;
return ss.str();
}
std::string LocalMetaImpl::GetGroupMetaPathByGroupPath(const std::string& group_path) {
return group_path + "/" + "meta";
}
std::string LocalMetaImpl::GetGroupMetaPath(const std::string& group_id) {
return GetGroupMetaPathByGroupPath(GetGroupPath(group_id));
}
Status LocalMetaImpl::GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info) {
boost::property_tree::ptree ptree;
boost::property_tree::read_json(path, ptree);
auto files_cnt = ptree.get_child("files_cnt").data();
auto dimension = ptree.get_child("dimension").data();
/* std::cout << dimension << std::endl; */
/* std::cout << files_cnt << std::endl; */
group_info.id = std::stoi(group_info.group_id);
group_info.files_cnt = std::stoi(files_cnt);
group_info.dimension = std::stoi(dimension);
group_info.location = GetGroupPath(group_info.group_id);
return Status::OK();
}
Status LocalMetaImpl::GetGroupMetaInfo(const std::string& group_id, GroupSchema& group_info) {
group_info.group_id = group_id;
return GetGroupMetaInfoByPath(GetGroupMetaPath(group_id), group_info);
}
LocalMetaImpl::LocalMetaImpl(const DBMetaOptions& options_)
: _options(options_) {
initialize();
}
Status LocalMetaImpl::initialize() {
if (boost::filesystem::is_directory(_options.path)) {
}
else if (!boost::filesystem::create_directory(_options.path)) {
return Status::InvalidDBPath("Cannot Create " + _options.path);
}
return Status::OK();
}
Status LocalMetaImpl::add_group(GroupSchema& group_info) {
std::string real_gid;
size_t id = SimpleIDGenerator().getNextIDNumber();
if (group_info.group_id == "") {
std::stringstream ss;
ss << id;
real_gid = ss.str();
} else {
real_gid = group_info.group_id;
}
bool group_exist;
has_group(real_gid, group_exist);
if (group_exist) {
return Status::GroupError("Group Already Existed " + real_gid);
}
if (!boost::filesystem::create_directory(GetGroupPath(real_gid))) {
return Status::GroupError("Cannot Create Group " + real_gid);
}
group_info.group_id = real_gid;
group_info.files_cnt = 0;
group_info.id = 0;
group_info.location = GetGroupPath(real_gid);
boost::property_tree::ptree out;
out.put("files_cnt", group_info.files_cnt);
out.put("dimension", group_info.dimension);
boost::property_tree::write_json(GetGroupMetaPath(real_gid), out);
return Status::OK();
}
Status LocalMetaImpl::get_group(GroupSchema& group_info) {
bool group_exist;
has_group(group_info.group_id, group_exist);
if (!group_exist) {
return Status::NotFound("Group " + group_info.group_id + " Not Found");
}
return GetGroupMetaInfo(group_info.group_id, group_info);
}
Status LocalMetaImpl::has_group(const std::string& group_id, bool& has_or_not) {
has_or_not = boost::filesystem::is_directory(GetGroupPath(group_id));
return Status::OK();
}
Status LocalMetaImpl::add_group_file(GroupFileSchema& group_file_info) {
GroupSchema group_info;
/* auto status = get_group(group_info); */
/* if (!status.ok()) { */
/* return status; */
/* } */
/* auto location = GetNextGroupFileLocationByPartition(group_id, date, file_type); */
/* group_file_info.group_id = group_id; */
/* group_file_info.dimension = group_info.dimension; */
/* group_file_info.location = location; */
/* group_file_info.date = date; */
return Status::OK();
}
Status LocalMetaImpl::files_to_index(GroupFilesSchema& files) {
files.clear();
std::string suffix;
boost::filesystem::directory_iterator end_itr;
for (boost::filesystem::directory_iterator itr(_options.path); itr != end_itr; ++itr) {
auto group_path = itr->path().string();
GroupSchema group_info;
GetGroupMetaInfoByPath(GetGroupMetaPathByGroupPath(group_path), group_info);
for (boost::filesystem::directory_iterator innerItr(group_path); innerItr != end_itr; ++innerItr) {
auto partition_path = innerItr->path().string();
for (boost::filesystem::directory_iterator fItr(partition_path); fItr != end_itr; ++fItr) {
auto location = fItr->path().string();
suffix = location.substr(location.find_last_of('.') + 1);
if (suffix == "index") continue;
if (INDEX_TRIGGER_SIZE >= GetFileSize(location)) continue;
std::cout << "[About to index] " << location << std::endl;
GroupFileSchema f;
f.location = location;
/* f.group_id = group_id; */
f.dimension = group_info.dimension;
files.push_back(f);
}
}
}
return Status::OK();
}
Status LocalMetaImpl::files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) {
files.clear();
/* std::string suffix; */
/* boost::filesystem::directory_iterator end_itr; */
/* for (boost::filesystem::directory_iterator itr(_options.path); itr != end_itr; ++itr) { */
/* auto group_path = itr->path().string(); */
/* GroupSchema group_info; */
/* GetGroupMetaInfoByPath(GetGroupMetaPathByGroupPath(group_path), group_info); */
/* for (boost::filesystem::directory_iterator innerItr(group_path); innerItr != end_itr; ++innerItr) { */
/* auto partition_path = innerItr->path().string(); */
/* for (boost::filesystem::directory_iterator fItr(partition_path); fItr != end_itr; ++fItr) { */
/* auto location = fItr->path().string(); */
/* suffix = location.substr(location.find_last_of('.') + 1); */
/* if (suffix == "index") continue; */
/* if (INDEX_TRIGGER_SIZE < GetFileSize(location)) continue; */
/* std::cout << "[About to index] " << location << std::endl; */
/* GroupFileSchema f; */
/* f.location = location; */
/* f.group_id = group_id; */
/* f.dimension = group_info.dimension; */
/* files.push_back(f); */
/* } */
/* } */
/* } */
return Status::OK();
}
Status LocalMetaImpl::has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::update_group_file(GroupFileSchema& group_file_) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::update_files(GroupFilesSchema& files) {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::archive_files() {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::cleanup() {
//PXU TODO
return Status::OK();
}
Status LocalMetaImpl::cleanup_ttl_files(uint16_t seconds) {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::drop_all() {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::size(long& result) {
// PXU TODO
return Status::OK();
}
Status LocalMetaImpl::count(const std::string& group_id, long& result) {
// PXU TODO
return Status::OK();
}
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz

View File

@ -1,83 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include "Meta.h"
#include "Options.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace meta {
class LocalMetaImpl : public Meta {
public:
const size_t INDEX_TRIGGER_SIZE = 1024*1024*500;
LocalMetaImpl(const DBMetaOptions& options_);
virtual Status add_group(GroupSchema& group_info_) override;
virtual Status get_group(GroupSchema& group_info_) override;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) override;
virtual Status add_group_file(GroupFileSchema& group_file_info) override;
/* virtual Status delete_group_partitions(const std::string& group_id, */
/* const meta::DatesT& dates) override; */
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) override;
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) override;
virtual Status update_group_file(GroupFileSchema& group_file_) override;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) override;
virtual Status update_files(GroupFilesSchema& files) override;
virtual Status cleanup() override;
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) override;
virtual Status files_to_index(GroupFilesSchema&) override;
virtual Status archive_files() override;
virtual Status cleanup_ttl_files(uint16_t seconds) override;
virtual Status count(const std::string& group_id, long& result) override;
virtual Status drop_all() override;
virtual Status size(long& result) override;
private:
Status GetGroupMetaInfoByPath(const std::string& path, GroupSchema& group_info);
std::string GetGroupMetaPathByGroupPath(const std::string& group_path);
Status GetGroupMetaInfo(const std::string& group_id, GroupSchema& group_info);
std::string GetNextGroupFileLocationByPartition(const std::string& group_id, DateT& date,
GroupFileSchema::FILE_TYPE file_type);
std::string GetGroupDatePartitionPath(const std::string& group_id, DateT& date);
std::string GetGroupPath(const std::string& group_id);
std::string GetGroupMetaPath(const std::string& group_id);
Status CreateGroupMeta(const GroupSchema& group_schema);
long GetFileSize(const std::string& filename);
Status initialize();
const DBMetaOptions _options;
}; // LocalMetaImpl
} // namespace meta
} // namespace engine
} // namespace vecwise
} // namespace zilliz

View File

@ -5,15 +5,15 @@
******************************************************************************/
#pragma once
#include "IDGenerator.h"
#include "Status.h"
#include "Meta.h"
#include <map>
#include <string>
#include <ctime>
#include <memory>
#include <mutex>
#include "IDGenerator.h"
#include "Status.h"
#include "Meta.h"
namespace zilliz {
namespace vecwise {
@ -26,24 +26,24 @@ namespace meta {
template <typename EngineT>
class MemVectors {
public:
typedef typename EngineT::Ptr EnginePtr;
typedef typename meta::Meta::Ptr MetaPtr;
typedef std::shared_ptr<MemVectors<EngineT>> Ptr;
using EnginePtr = typename EngineT::Ptr;
using MetaPtr = meta::Meta::Ptr;
using Ptr = std::shared_ptr<MemVectors<EngineT>>;
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
const meta::GroupFileSchema&, const Options&);
const meta::TableFileSchema&, const Options&);
void add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
void Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
size_t total() const;
size_t Total() const;
size_t approximate_size() const;
size_t ApproximateSize() const;
Status serialize(std::string& group_id);
Status Serialize(std::string& table_id);
~MemVectors();
const std::string& location() const { return schema_.location; }
const std::string& Location() const { return schema_.location; }
private:
MemVectors() = delete;
@ -52,8 +52,8 @@ private:
MetaPtr pMeta_;
Options options_;
meta::GroupFileSchema schema_;
IDGenerator* _pIdGenerator;
meta::TableFileSchema schema_;
IDGenerator* pIdGenerator_;
EnginePtr pEE_;
}; // MemVectors
@ -63,32 +63,32 @@ private:
template<typename EngineT>
class MemManager {
public:
typedef typename meta::Meta::Ptr MetaPtr;
typedef typename MemVectors<EngineT>::Ptr MemVectorsPtr;
typedef std::shared_ptr<MemManager<EngineT>> Ptr;
using MetaPtr = meta::Meta::Ptr;
using MemVectorsPtr = typename MemVectors<EngineT>::Ptr;
using Ptr = std::shared_ptr<MemManager<EngineT>>;
MemManager(const std::shared_ptr<meta::Meta>& meta_, const Options& options)
: _pMeta(meta_), options_(options) {}
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
: pMeta_(meta), options_(options) {}
MemVectorsPtr get_mem_by_group(const std::string& group_id_);
MemVectorsPtr GetMemByTable(const std::string& table_id);
Status add_vectors(const std::string& group_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status serialize(std::vector<std::string>& group_ids);
Status Serialize(std::vector<std::string>& table_ids);
private:
Status add_vectors_no_lock(const std::string& group_id_,
size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status mark_memory_as_immutable();
Status InsertVectorsNoLock(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status ToImmutable();
typedef std::map<std::string, MemVectorsPtr> MemMap;
typedef std::vector<MemVectorsPtr> ImmMemPool;
MemMap _memMap;
ImmMemPool _immMems;
MetaPtr _pMeta;
using MemMap = std::map<std::string, MemVectorsPtr>;
using ImmMemPool = std::vector<MemVectorsPtr>;
MemMap memMap_;
ImmMemPool immMems_;
MetaPtr pMeta_;
Options options_;
std::mutex _mutex;
std::mutex mutex_;
std::mutex serialization_mtx_;
}; // MemManager
@ -96,4 +96,4 @@ private:
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#include "MemManager.cpp"
#include "MemManager.inl"

View File

@ -3,18 +3,16 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#ifndef MEMMANGE_CPP__
#define MEMMANGE_CPP__
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
#pragma once
#include "MemManager.h"
#include "Meta.h"
#include "MetaConsts.h"
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
namespace zilliz {
namespace vecwise {
@ -22,42 +20,42 @@ namespace engine {
template<typename EngineT>
MemVectors<EngineT>::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
const meta::GroupFileSchema& schema, const Options& options)
const meta::TableFileSchema& schema, const Options& options)
: pMeta_(meta_ptr),
options_(options),
schema_(schema),
_pIdGenerator(new SimpleIDGenerator()),
pIdGenerator_(new SimpleIDGenerator()),
pEE_(new EngineT(schema_.dimension, schema_.location)) {
}
template<typename EngineT>
void MemVectors<EngineT>::add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
_pIdGenerator->getNextIDNumbers(n_, vector_ids_);
void MemVectors<EngineT>::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
pIdGenerator_->GetNextIDNumbers(n_, vector_ids_);
pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
}
template<typename EngineT>
size_t MemVectors<EngineT>::total() const {
size_t MemVectors<EngineT>::Total() const {
return pEE_->Count();
}
template<typename EngineT>
size_t MemVectors<EngineT>::approximate_size() const {
size_t MemVectors<EngineT>::ApproximateSize() const {
return pEE_->Size();
}
template<typename EngineT>
Status MemVectors<EngineT>::serialize(std::string& group_id) {
group_id = schema_.group_id;
auto size = approximate_size();
Status MemVectors<EngineT>::Serialize(std::string& table_id) {
table_id = schema_.table_id;
auto size = ApproximateSize();
pEE_->Serialize();
schema_.size = size;
schema_.file_type = (size >= options_.index_trigger_size) ?
meta::GroupFileSchema::TO_INDEX : meta::GroupFileSchema::RAW;
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = pMeta_->update_group_file(schema_);
auto status = pMeta_->UpdateTableFile(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type == meta::GroupFileSchema::RAW) ? "raw" : "to_index")
LOG(DEBUG) << "New " << ((schema_.file_type == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id << " of size " << pEE_->Size() / meta::M << " M";
pEE_->Cache();
@ -67,9 +65,9 @@ Status MemVectors<EngineT>::serialize(std::string& group_id) {
template<typename EngineT>
MemVectors<EngineT>::~MemVectors() {
if (_pIdGenerator != nullptr) {
delete _pIdGenerator;
_pIdGenerator = nullptr;
if (pIdGenerator_ != nullptr) {
delete pIdGenerator_;
pIdGenerator_ = nullptr;
}
}
@ -78,69 +76,69 @@ MemVectors<EngineT>::~MemVectors() {
*/
template<typename EngineT>
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::get_mem_by_group(
const std::string& group_id) {
auto memIt = _memMap.find(group_id);
if (memIt != _memMap.end()) {
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::GetMemByTable(
const std::string& table_id) {
auto memIt = memMap_.find(table_id);
if (memIt != memMap_.end()) {
return memIt->second;
}
meta::GroupFileSchema group_file;
group_file.group_id = group_id;
auto status = _pMeta->add_group_file(group_file);
meta::TableFileSchema table_file;
table_file.table_id = table_id;
auto status = pMeta_->CreateTableFile(table_file);
if (!status.ok()) {
return nullptr;
}
_memMap[group_id] = MemVectorsPtr(new MemVectors<EngineT>(_pMeta, group_file, options_));
return _memMap[group_id];
memMap_[table_id] = MemVectorsPtr(new MemVectors<EngineT>(pMeta_, table_file, options_));
return memMap_[table_id];
}
template<typename EngineT>
Status MemManager<EngineT>::add_vectors(const std::string& group_id_,
Status MemManager<EngineT>::InsertVectors(const std::string& table_id_,
size_t n_,
const float* vectors_,
IDNumbers& vector_ids_) {
std::unique_lock<std::mutex> lock(_mutex);
return add_vectors_no_lock(group_id_, n_, vectors_, vector_ids_);
std::unique_lock<std::mutex> lock(mutex_);
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}
template<typename EngineT>
Status MemManager<EngineT>::add_vectors_no_lock(const std::string& group_id,
Status MemManager<EngineT>::InsertVectorsNoLock(const std::string& table_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
MemVectorsPtr mem = get_mem_by_group(group_id);
MemVectorsPtr mem = GetMemByTable(table_id);
if (mem == nullptr) {
return Status::NotFound("Group " + group_id + " not found!");
return Status::NotFound("Group " + table_id + " not found!");
}
mem->add(n, vectors, vector_ids);
mem->Add(n, vectors, vector_ids);
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::mark_memory_as_immutable() {
std::unique_lock<std::mutex> lock(_mutex);
for (auto& kv: _memMap) {
_immMems.push_back(kv.second);
Status MemManager<EngineT>::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
for (auto& kv: memMap_) {
immMems_.push_back(kv.second);
}
_memMap.clear();
memMap_.clear();
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::serialize(std::vector<std::string>& group_ids) {
mark_memory_as_immutable();
Status MemManager<EngineT>::Serialize(std::vector<std::string>& table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string group_id;
group_ids.clear();
for (auto& mem : _immMems) {
mem->serialize(group_id);
group_ids.push_back(group_id);
std::string table_id;
table_ids.clear();
for (auto& mem : immMems_) {
mem->Serialize(table_id);
table_ids.push_back(table_id);
}
_immMems.clear();
immMems_.clear();
return Status::OK();
}
@ -148,5 +146,3 @@ Status MemManager<EngineT>::serialize(std::vector<std::string>& group_ids) {
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#endif

View File

@ -3,9 +3,10 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Meta.h"
#include <ctime>
#include <stdio.h>
#include "Meta.h"
namespace zilliz {
namespace vecwise {

View File

@ -4,14 +4,15 @@
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <cstddef>
#include <ctime>
#include <memory>
#include "MetaTypes.h"
#include "Options.h"
#include "Status.h"
#include <cstddef>
#include <ctime>
#include <memory>
namespace zilliz {
namespace vecwise {
namespace engine {
@ -20,49 +21,40 @@ namespace meta {
class Meta {
public:
typedef std::shared_ptr<Meta> Ptr;
using Ptr = std::shared_ptr<Meta>;
virtual Status add_group(GroupSchema& group_info) = 0;
virtual Status get_group(GroupSchema& group_info) = 0;
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
virtual Status CreateTable(TableSchema& table_schema) = 0;
virtual Status DescribeTable(TableSchema& table_schema) = 0;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0;
virtual Status add_group_file(GroupFileSchema& group_file_info) = 0;
virtual Status delete_group_partitions(const std::string& group_id,
const meta::DatesT& dates) = 0;
virtual Status CreateTableFile(TableFileSchema& file_schema) = 0;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) = 0;
virtual Status has_group_file(const std::string& group_id_,
const std::string& file_id_,
bool& has_or_not_) = 0;
virtual Status get_group_file(const std::string& group_id_,
const std::string& file_id_,
GroupFileSchema& group_file_info_) = 0;
virtual Status update_group_file(GroupFileSchema& group_file_) = 0;
virtual Status GetTableFile(TableFileSchema& file_schema) = 0;
virtual Status UpdateTableFile(TableFileSchema& file_schema) = 0;
virtual Status get_group_files(const std::string& group_id_,
const int date_delta_,
GroupFilesSchema& group_files_info_) = 0;
virtual Status UpdateTableFiles(TableFilesSchema& files) = 0;
virtual Status update_files(GroupFilesSchema& files) = 0;
virtual Status files_to_search(const std::string& group_id,
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedGroupFilesSchema& files) = 0;
DatePartionedTableFilesSchema& files) = 0;
virtual Status files_to_merge(const std::string& group_id,
DatePartionedGroupFilesSchema& files) = 0;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) = 0;
virtual Status size(long& result) = 0;
virtual Status Size(long& result) = 0;
virtual Status archive_files() = 0;
virtual Status Archive() = 0;
virtual Status files_to_index(GroupFilesSchema&) = 0;
virtual Status FilesToIndex(TableFilesSchema&) = 0;
virtual Status cleanup() = 0;
virtual Status cleanup_ttl_files(uint16_t) = 0;
virtual Status CleanUp() = 0;
virtual Status CleanUpFilesWithTTL(uint16_t) = 0;
virtual Status drop_all() = 0;
virtual Status DropAll() = 0;
virtual Status count(const std::string& group_id, long& result) = 0;
virtual Status Count(const std::string& table_id, long& result) = 0;
static DateT GetDate(const std::time_t& t, int day_delta = 0);
static DateT GetDate();

View File

@ -18,16 +18,16 @@ typedef int DateT;
const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
struct GroupSchema {
struct TableSchema {
size_t id;
std::string group_id;
std::string table_id;
size_t files_cnt = 0;
uint16_t dimension;
std::string location = "";
std::string location;
long created_on;
}; // GroupSchema
}; // TableSchema
struct GroupFileSchema {
struct TableFileSchema {
typedef enum {
NEW,
RAW,
@ -37,19 +37,19 @@ struct GroupFileSchema {
} FILE_TYPE;
size_t id;
std::string group_id;
std::string table_id;
std::string file_id;
int file_type = NEW;
size_t size;
DateT date = EmptyDate;
uint16_t dimension;
std::string location = "";
std::string location;
long updated_time;
long created_on;
}; // GroupFileSchema
}; // TableFileSchema
typedef std::vector<GroupFileSchema> GroupFilesSchema;
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
typedef std::vector<TableFileSchema> TableFilesSchema;
typedef std::map<DateT, TableFilesSchema> DatePartionedTableFilesSchema;
} // namespace meta
} // namespace engine

View File

@ -48,12 +48,6 @@ struct Options {
}; // Options
struct GroupOptions {
size_t dimension;
bool has_id = false;
}; // GroupOptions
} // namespace engine
} // namespace vecwise
} // namespace zilliz

View File

@ -3,9 +3,9 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Utils.h"
#include <chrono>
#include "Utils.h"
namespace zilliz {
namespace vecwise {

View File

@ -1,31 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "db_connection.h"
namespace zilliz {
namespace vecwise {
namespace engine {
using std::string;
using namespace sqlite_orm;
string storage_file_name = "default.sqlite";
SqliteDBPtr connect() {
SqliteDBPtr temp = std::make_shared<SqliteDB>(initStorage(storage_file_name));
temp->sync_schema();
temp->open_forever(); // thread safe option
//temp->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log
return temp;
}
/* SqliteDBPtr Connection::connect_ = connect(); */
}
}
}

View File

@ -1,64 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <string>
#include <memory>
#include <sqlite_orm.h>
namespace zilliz {
namespace vecwise {
namespace engine {
struct GroupSchema {
size_t id;
std::string group_id;
size_t files_cnt = 0;
uint16_t dimension;
std::string location = "";
std::string next_file_location = "";
}; // GroupSchema
struct GroupFileSchema {
typedef enum {
RAW,
INDEX
} FILE_TYPE;
size_t id;
std::string group_id;
std::string file_id;
int files_type = RAW;
size_t rows;
std::string location = "";
}; // GroupFileSchema
inline auto initStorage(const std::string &path) {
using namespace sqlite_orm;
return make_storage(path,
// Add table below
make_table("Groups",
make_column("id", &GroupSchema::id, primary_key()),
make_column("group_id", &GroupSchema::group_id, unique()),
make_column("dimension", &GroupSchema::dimension),
make_column("files_cnt", &GroupSchema::files_cnt, default_value(0))));
}
using SqliteDB = decltype(initStorage(""));
using SqliteDBPtr= std::shared_ptr<SqliteDB>;
class Connection {
protected:
static SqliteDBPtr connect_;
};
}
}
}

View File

@ -78,17 +78,17 @@ BaseTaskPtr CreateTableTask::Create(const thrift::TableSchema& schema) {
ServerError CreateTableTask::OnExecute() {
TimeRecorder rc("CreateTableTask");
try {
if(schema_.vector_column_array.empty()) {
return SERVER_INVALID_ARGUMENT;
}
IVecIdMapper::GetInstance()->AddGroup(schema_.table_name);
engine::meta::GroupSchema group_info;
group_info.dimension = (uint16_t)schema_.vector_column_array[0].dimension;
group_info.group_id = schema_.table_name;
engine::Status stat = DB()->add_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.dimension = (uint16_t)schema_.vector_column_array[0].dimension;
table_schema.table_id = schema_.table_name;
engine::Status stat = DB()->CreateTable(table_schema);
if(!stat.ok()) {//could exist
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
@ -123,9 +123,9 @@ ServerError DescribeTableTask::OnExecute() {
TimeRecorder rc("DescribeTableTask");
try {
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -154,8 +154,8 @@ DeleteTableTask::DeleteTableTask(const std::string& table_name)
}
BaseTaskPtr DeleteTableTask::Create(const std::string& group_id) {
return std::shared_ptr<BaseTask>(new DeleteTableTask(group_id));
BaseTaskPtr DeleteTableTask::Create(const std::string& table_id) {
return std::shared_ptr<BaseTask>(new DeleteTableTask(table_id));
}
ServerError DeleteTableTask::OnExecute() {
@ -195,9 +195,9 @@ ServerError AddVectorTask::OnExecute() {
return SERVER_SUCCESS;
}
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -208,7 +208,7 @@ ServerError AddVectorTask::OnExecute() {
rc.Record("get group info");
uint64_t vec_count = (uint64_t)record_array_.size();
uint64_t group_dim = group_info.dimension;
uint64_t group_dim = table_schema.dimension;
std::vector<float> vec_f;
vec_f.resize(vec_count*group_dim);//allocate enough memory
for(uint64_t i = 0; i < vec_count; i++) {
@ -236,7 +236,7 @@ ServerError AddVectorTask::OnExecute() {
rc.Record("prepare vectors data");
stat = DB()->add_vectors(table_name_, vec_count, vec_f.data(), record_ids_);
stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
rc.Record("add vectors to engine");
if(!stat.ok()) {
error_code_ = SERVER_UNEXPECTED_ERROR;
@ -293,9 +293,9 @@ ServerError SearchVectorTask::OnExecute() {
return error_code_;
}
engine::meta::GroupSchema group_info;
group_info.group_id = table_name_;
engine::Status stat = DB()->get_group(group_info);
engine::meta::TableSchema table_schema;
table_schema.table_id = table_name_;
engine::Status stat = DB()->DescribeTable(table_schema);
if(!stat.ok()) {
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
@ -305,7 +305,7 @@ ServerError SearchVectorTask::OnExecute() {
std::vector<float> vec_f;
uint64_t record_count = (uint64_t)record_array_.size();
vec_f.resize(record_count*group_info.dimension);
vec_f.resize(record_count*table_schema.dimension);
for(uint64_t i = 0; i < record_array_.size(); i++) {
const auto& record = record_array_[i];
@ -317,9 +317,9 @@ ServerError SearchVectorTask::OnExecute() {
}
uint64_t vec_dim = record.vector_map.begin()->second.size() / sizeof(double);//how many double value?
if (vec_dim != group_info.dimension) {
if (vec_dim != table_schema.dimension) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_info.dimension;
<< " vs. group dimension:" << table_schema.dimension;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
@ -335,7 +335,7 @@ ServerError SearchVectorTask::OnExecute() {
std::vector<DB_DATE> dates;
engine::QueryResults results;
stat = DB()->search(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
stat = DB()->Query(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;

View File

@ -66,21 +66,21 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
static const int group_dim = 256;
long size;
engine::meta::GroupSchema group_info;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
db_->size(size);
db_->Size(size);
int d = 256;
int nb = 20;
float *xb = new float[d * nb];
@ -92,13 +92,13 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
int loop = 100000;
for (auto i=0; i<loop; ++i) {
db_->add_vectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(group_name, nb, xb, vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->size(size);
db_->Size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_TRUE(size < 1 * engine::meta::G);
@ -111,14 +111,14 @@ TEST_F(DBTest, DB_TEST) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::GroupSchema group_info;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
@ -152,12 +152,12 @@ TEST_F(DBTest, DB_TEST) {
for (auto j=0; j<10; ++j) {
ss.str("");
db_->count(group_name, count);
db_->Size(count);
prev_count = count;
START_TIMER;
stat = db_->search(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/engine::meta::M << " M";
stat = db_->Query(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
@ -179,10 +179,10 @@ TEST_F(DBTest, DB_TEST) {
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->add_vectors(group_name, qb, qxb, target_ids);
db_->InsertVectors(group_name, qb, qxb, target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->add_vectors(group_name, nb, xb, vector_ids);
db_->InsertVectors(group_name, nb, xb, vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
@ -197,14 +197,14 @@ TEST_F(DBTest, SEARCH_TEST) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::GroupSchema group_info;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.group_id = group_name;
engine::Status stat = db_->add_group(group_info);
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::GroupSchema group_info_get;
group_info_get.group_id = group_name;
stat = db_->get_group(group_info_get);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
ASSERT_STATS(stat);
ASSERT_EQ(group_info_get.dimension, group_dim);
@ -238,7 +238,7 @@ TEST_F(DBTest, SEARCH_TEST) {
// insert data
const int batch_size = 100;
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->add_vectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids);
stat = db_->InsertVectors(group_name, batch_size, xb.data()+batch_size*j*group_dim, ids);
if (j == 200){ sleep(1);}
ASSERT_STATS(stat);
}
@ -246,7 +246,7 @@ TEST_F(DBTest, SEARCH_TEST) {
sleep(2); // wait until build index finish
engine::QueryResults results;
stat = db_->search(group_name, k, nq, xq.data(), results);
stat = db_->Query(group_name, k, nq, xq.data(), results);
ASSERT_STATS(stat);
// TODO(linxj): add groundTruth assert

View File

@ -18,76 +18,76 @@
using namespace zilliz::vecwise::engine;
TEST_F(MetaTest, GROUP_TEST) {
auto group_id = "meta_test_group";
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl_->add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl_->CreateTable(group);
ASSERT_TRUE(status.ok());
auto gid = group.id;
group.id = -1;
status = impl_->get_group(group);
status = impl_->DescribeTable(group);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group.id, gid);
ASSERT_EQ(group.group_id, group_id);
ASSERT_EQ(group.table_id, table_id);
group.group_id = "not_found";
status = impl_->get_group(group);
group.table_id = "not_found";
status = impl_->DescribeTable(group);
ASSERT_TRUE(!status.ok());
group.group_id = group_id;
status = impl_->add_group(group);
group.table_id = table_id;
status = impl_->CreateTable(group);
ASSERT_TRUE(!status.ok());
}
TEST_F(MetaTest, GROUP_FILE_TEST) {
auto group_id = "meta_test_group";
TEST_F(MetaTest, table_file_TEST) {
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl_->add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl_->CreateTable(group);
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
status = impl_->add_group_file(group_file);
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
status = impl_->CreateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.file_type, meta::GroupFileSchema::NEW);
ASSERT_EQ(table_file.file_type, meta::TableFileSchema::NEW);
auto file_id = group_file.file_id;
auto file_id = table_file.file_id;
auto new_file_type = meta::GroupFileSchema::INDEX;
group_file.file_type = new_file_type;
auto new_file_type = meta::TableFileSchema::INDEX;
table_file.file_type = new_file_type;
status = impl_->update_group_file(group_file);
status = impl_->UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.file_type, new_file_type);
ASSERT_EQ(table_file.file_type, new_file_type);
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
status = impl_->delete_group_partitions(group_file.group_id, dates);
status = impl_->DropPartitionsByDates(table_file.table_id, dates);
ASSERT_FALSE(status.ok());
dates.clear();
for (auto i=2; i < 10; ++i) {
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
}
status = impl_->delete_group_partitions(group_file.group_id, dates);
status = impl_->DropPartitionsByDates(table_file.table_id, dates);
ASSERT_TRUE(status.ok());
group_file.date = meta::Meta::GetDateWithDelta(-2);
status = impl_->update_group_file(group_file);
table_file.date = meta::Meta::GetDateWithDelta(-2);
status = impl_->UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(group_file.date, meta::Meta::GetDateWithDelta(-2));
ASSERT_FALSE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
ASSERT_EQ(table_file.date, meta::Meta::GetDateWithDelta(-2));
ASSERT_FALSE(table_file.file_type == meta::TableFileSchema::TO_DELETE);
dates.clear();
dates.push_back(group_file.date);
status = impl_->delete_group_partitions(group_file.group_id, dates);
dates.push_back(table_file.date);
status = impl_->DropPartitionsByDates(table_file.table_id, dates);
ASSERT_TRUE(status.ok());
status = impl_->get_group_file(group_file.group_id, group_file.file_id, group_file);
status = impl_->GetTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(group_file.file_type == meta::GroupFileSchema::TO_DELETE);
ASSERT_TRUE(table_file.file_type == meta::TableFileSchema::TO_DELETE);
}
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
@ -100,44 +100,44 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
options.archive_conf = ArchiveConf("delete", ss.str());
auto impl = meta::DBMetaImpl(options);
auto group_id = "meta_test_group";
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl.add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl.CreateTable(group);
meta::GroupFilesSchema files;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
auto cnt = 100;
long ts = utils::GetMicroSecTimeStamp();
std::vector<int> days;
for (auto i=0; i<cnt; ++i) {
status = impl.add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
status = impl.CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::NEW;
int day = rand() % (days_num*2);
group_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
status = impl.update_group_file(group_file);
files.push_back(group_file);
table_file.created_on = ts - day*meta::D_SEC*meta::US_PS - 10000;
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
days.push_back(day);
}
impl.archive_files();
impl.Archive();
int i = 0;
for (auto file : files) {
status = impl.get_group_file(file.group_id, file.file_id, file);
status = impl.GetTableFile(file);
ASSERT_TRUE(status.ok());
if (days[i] < days_num) {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
ASSERT_EQ(file.file_type, meta::TableFileSchema::NEW);
} else {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::TO_DELETE);
ASSERT_EQ(file.file_type, meta::TableFileSchema::TO_DELETE);
}
i++;
}
impl.drop_all();
impl.DropAll();
}
TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
@ -146,100 +146,100 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
options.archive_conf = ArchiveConf("delete", "disk:11");
auto impl = meta::DBMetaImpl(options);
auto group_id = "meta_test_group";
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl.add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl.CreateTable(group);
meta::GroupFilesSchema files;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
meta::TableFilesSchema files;
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
auto cnt = 10;
auto each_size = 2UL;
for (auto i=0; i<cnt; ++i) {
status = impl.add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
group_file.size = each_size * meta::G;
status = impl.update_group_file(group_file);
files.push_back(group_file);
status = impl.CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::NEW;
table_file.size = each_size * meta::G;
status = impl.UpdateTableFile(table_file);
files.push_back(table_file);
}
impl.archive_files();
impl.Archive();
int i = 0;
for (auto file : files) {
status = impl.get_group_file(file.group_id, file.file_id, file);
status = impl.GetTableFile(file);
ASSERT_TRUE(status.ok());
if (i < 5) {
ASSERT_TRUE(file.file_type == meta::GroupFileSchema::TO_DELETE);
ASSERT_TRUE(file.file_type == meta::TableFileSchema::TO_DELETE);
} else {
ASSERT_EQ(file.file_type, meta::GroupFileSchema::NEW);
ASSERT_EQ(file.file_type, meta::TableFileSchema::NEW);
}
++i;
}
impl.drop_all();
impl.DropAll();
}
TEST_F(MetaTest, GROUP_FILES_TEST) {
auto group_id = "meta_test_group";
TEST_F(MetaTest, TABLE_FILES_TEST) {
auto table_id = "meta_test_group";
meta::GroupSchema group;
group.group_id = group_id;
auto status = impl_->add_group(group);
meta::TableSchema group;
group.table_id = table_id;
auto status = impl_->CreateTable(group);
int new_files_cnt = 4;
int raw_files_cnt = 5;
int to_index_files_cnt = 6;
int index_files_cnt = 7;
meta::GroupFileSchema group_file;
group_file.group_id = group.group_id;
meta::TableFileSchema table_file;
table_file.table_id = group.table_id;
for (auto i=0; i<new_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::NEW;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::NEW;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<raw_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::RAW;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::RAW;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<to_index_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::TO_INDEX;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::TO_INDEX;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<index_files_cnt; ++i) {
status = impl_->add_group_file(group_file);
group_file.file_type = meta::GroupFileSchema::INDEX;
status = impl_->update_group_file(group_file);
status = impl_->CreateTableFile(table_file);
table_file.file_type = meta::TableFileSchema::INDEX;
status = impl_->UpdateTableFile(table_file);
}
meta::GroupFilesSchema files;
meta::TableFilesSchema files;
status = impl_->files_to_index(files);
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatePartionedGroupFilesSchema dated_files;
status = impl_->files_to_merge(group.group_id, dated_files);
meta::DatePartionedTableFilesSchema dated_files;
status = impl_->FilesToMerge(group.table_id, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[group_file.date].size(), raw_files_cnt);
ASSERT_EQ(dated_files[table_file.date].size(), raw_files_cnt);
status = impl_->files_to_index(files);
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatesT dates = {group_file.date};
status = impl_->files_to_search(group_id, dates, dated_files);
meta::DatesT dates = {table_file.date};
status = impl_->FilesToSearch(table_id, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[group_file.date].size(),
ASSERT_EQ(dated_files[table_file.date].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
}

View File

@ -59,5 +59,5 @@ void MetaTest::SetUp() {
}
void MetaTest::TearDown() {
impl_->drop_all();
impl_->DropAll();
}