refine code

Former-commit-id: af1b487c2c21aba34ac91b8f5df4ce97a5836919
This commit is contained in:
groot 2019-06-18 19:31:33 +08:00
parent cd2f7d57ff
commit 6a6673f1b9
15 changed files with 276 additions and 422 deletions

View File

@ -8,7 +8,6 @@ server_config:
db_config:
db_path: /tmp/milvus
db_backend_url: http://127.0.0.1
db_flush_interval: 5 #flush cache data into disk at intervals, unit: second
index_building_threshold: 1024 #build index file when raw data file size larger than this value, unit: MB
metric_config:

View File

@ -6,7 +6,6 @@
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "Factories.h"
namespace zilliz {

View File

@ -5,7 +5,6 @@
******************************************************************************/
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "Log.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
@ -125,13 +124,12 @@ void CalcScore(uint64_t vector_count,
DBImpl::DBImpl(const Options& options)
: env_(options.env),
options_(options),
bg_compaction_scheduled_(false),
: options_(options),
shutting_down_(false),
bg_build_index_started_(false),
pMeta_(new meta::DBMetaImpl(options_.meta)),
pMemMgr_(new MemManager(pMeta_, options_)) {
pMemMgr_(new MemManager(pMeta_, options_)),
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
StartTimerTasks(options_.memory_sync_interval);
}
@ -140,28 +138,10 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
}
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToDelete(table_id, dates, files);
if (!status.ok()) { return status; }
for (auto &day_files : files) {
for (auto &file : day_files.second) {
boost::filesystem::remove(file.location_);
}
}
//dates empty means delete all files of the table
if(dates.empty()) {
meta::TableSchema table_schema;
table_schema.table_id_ = table_id;
status = DescribeTable(table_schema);
pMeta_->DeleteTable(table_id);
boost::system::error_code ec;
boost::filesystem::remove_all(table_schema.location_, ec);
if(ec.failed()) {
ENGINE_LOG_WARNING << "Failed to remove table folder";
}
pMemMgr_->EraseMemVector(table_id); //not allow insert
pMeta_->DeleteTable(table_id); //soft delete
}
return Status::OK();
@ -405,7 +385,15 @@ void DBImpl::BackgroundTimerTask(int interval) {
server::SystemInfo::GetInstance().Init();
while (true) {
if (!bg_error_.ok()) break;
if (shutting_down_.load(std::memory_order_acquire)) break;
if (shutting_down_.load(std::memory_order_acquire)){
for(auto& iter : compact_thread_results_) {
iter.wait();
}
for(auto& iter : index_thread_results_) {
iter.wait();
}
break;
}
std::this_thread::sleep_for(std::chrono::seconds(interval));
@ -421,33 +409,34 @@ void DBImpl::BackgroundTimerTask(int interval) {
server::Metrics::GetInstance().GPUPercentGaugeSet();
server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
server::Metrics::GetInstance().OctetsSet();
TrySchedule();
StartCompactionTask();
StartBuildIndexTask();
}
}
void DBImpl::TrySchedule() {
if (bg_compaction_scheduled_) return;
if (!bg_error_.ok()) return;
void DBImpl::StartCompactionTask() {
//serialize memory data
std::vector<std::string> temp_table_ids;
pMemMgr_->Serialize(temp_table_ids);
for(auto& id : temp_table_ids) {
compact_table_ids_.insert(id);
}
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
//compactiong has been finished?
if(!compact_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
compact_thread_results_.pop_back();
}
}
void DBImpl::BGWork(void* db_) {
reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
}
void DBImpl::BackgroundCall() {
std::lock_guard<std::mutex> lock(mutex_);
assert(bg_compaction_scheduled_);
if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
return ;
BackgroundCompaction();
bg_compaction_scheduled_ = false;
bg_work_finish_signal_.notify_all();
//add new compaction task
if(compact_thread_results_.empty()) {
compact_thread_results_.push_back(
compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
compact_table_ids_.clear();
}
}
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
@ -512,7 +501,6 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() <= options_.merge_trigger_number) {
@ -520,15 +508,43 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
has_merge = true;
MergeFiles(table_id, kv.first, kv.second);
if (shutting_down_.load(std::memory_order_acquire)){
break;
}
}
return Status::OK();
}
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
Status status;
for (auto table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
bg_error_ = status;
return;
}
}
pMeta_->Archive();
TryBuildIndex();
pMeta_->CleanUpFilesWithTTL(1);
}
return Status::OK();
void DBImpl::StartBuildIndexTask() {
//build index has been finished?
if(!index_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
index_thread_results_.pop_back();
}
}
//add new build index task
if(index_thread_results_.empty()) {
index_thread_results_.push_back(
index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
}
}
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
@ -569,8 +585,6 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
}
void DBImpl::BackgroundBuildIndex() {
std::lock_guard<std::mutex> lock(build_index_mutex_);
assert(bg_build_index_started_);
meta::TableFilesSchema to_index_files;
pMeta_->FilesToIndex(to_index_files);
Status status;
@ -581,34 +595,12 @@ void DBImpl::BackgroundBuildIndex() {
bg_error_ = status;
return;
}
}
/* LOG(DEBUG) << "All Buiding index Done"; */
bg_build_index_started_ = false;
bg_build_index_finish_signal_.notify_all();
}
Status DBImpl::TryBuildIndex() {
if (bg_build_index_started_) return Status::OK();
if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
bg_build_index_started_ = true;
std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
build_index_task.detach();
return Status::OK();
}
void DBImpl::BackgroundCompaction() {
std::vector<std::string> table_ids;
pMemMgr_->Serialize(table_ids);
Status status;
for (auto table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
bg_error_ = status;
return;
if (shutting_down_.load(std::memory_order_acquire)){
break;
}
}
/* LOG(DEBUG) << "All Buiding index Done"; */
}
Status DBImpl::DropAll() {
@ -620,23 +612,10 @@ Status DBImpl::Size(uint64_t& result) {
}
DBImpl::~DBImpl() {
{
std::unique_lock<std::mutex> lock(mutex_);
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_) {
bg_work_finish_signal_.wait(lock);
}
}
{
std::unique_lock<std::mutex> lock(build_index_mutex_);
while (bg_build_index_started_) {
bg_build_index_finish_signal_.wait(lock);
}
}
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
std::vector<std::string> ids;
pMemMgr_->Serialize(ids);
env_->Stop();
}
} // namespace engine

View File

@ -8,12 +8,15 @@
#include "DB.h"
#include "MemManager.h"
#include "Types.h"
#include "utils/ThreadPool.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
#include <thread>
#include <list>
#include <set>
namespace zilliz {
namespace milvus {
@ -62,40 +65,37 @@ private:
const float* vectors, const meta::DatesT& dates, QueryResults& results);
void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema&);
Status TryBuildIndex();
Status MergeFiles(const std::string& table_id,
const meta::DateT& date,
const meta::TableFilesSchema& files);
Status BackgroundMergeFiles(const std::string& table_id);
void TrySchedule();
void StartTimerTasks(int interval);
void BackgroundTimerTask(int interval);
static void BGWork(void* db);
void BackgroundCall();
void BackgroundCompaction();
void StartCompactionTask();
Status MergeFiles(const std::string& table_id,
const meta::DateT& date,
const meta::TableFilesSchema& files);
Status BackgroundMergeFiles(const std::string& table_id);
void BackgroundCompaction(std::set<std::string> table_ids);
void StartBuildIndexTask();
void BackgroundBuildIndex();
Status BuildIndex(const meta::TableFileSchema&);
Env* const env_;
const Options options_;
std::mutex mutex_;
std::condition_variable bg_work_finish_signal_;
bool bg_compaction_scheduled_;
Status bg_error_;
std::atomic<bool> shutting_down_;
std::mutex build_index_mutex_;
bool bg_build_index_started_;
std::condition_variable bg_build_index_finish_signal_;
std::thread bg_timer_thread_;
MetaPtr pMeta_;
MemManagerPtr pMemMgr_;
server::ThreadPool compact_thread_pool_;
std::list<std::future<void>> compact_thread_results_;
std::set<std::string> compact_table_ids_;
server::ThreadPool index_thread_pool_;
std::list<std::future<void>> index_thread_results_;
}; // DBImpl

View File

@ -29,24 +29,43 @@ using namespace sqlite_orm;
namespace {
void HandleException(std::exception &e) {
ENGINE_LOG_DEBUG << "Engine meta exception: " << e.what();
throw e;
Status HandleException(const std::string& desc, std::exception &e) {
ENGINE_LOG_ERROR << desc << ": " << e.what();
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
inline auto StoragePrototype(const std::string &path) {
return make_storage(path,
make_table("Table",
make_table("Tables",
make_column("id", &TableSchema::id_, primary_key()),
make_column("table_id", &TableSchema::table_id_, unique()),
make_column("state", &TableSchema::state_),
make_column("dimension", &TableSchema::dimension_),
make_column("created_on", &TableSchema::created_on_),
make_column("files_cnt", &TableSchema::files_cnt_, default_value(0)),
make_column("engine_type", &TableSchema::engine_type_),
make_column("store_raw_data", &TableSchema::store_raw_data_)),
make_table("TableFile",
make_table("TableFiles",
make_column("id", &TableFileSchema::id_, primary_key()),
make_column("table_id", &TableFileSchema::table_id_),
make_column("engine_type", &TableFileSchema::engine_type_),
@ -109,9 +128,9 @@ Status DBMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << options_.path << " Error";
ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path;
return Status::DBTransactionError("Failed to create db directory", options_.path);
}
assert(ret);
}
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
@ -139,15 +158,15 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
return status;
}
auto yesterday = GetDateWithDelta(-1);
for (auto &date : dates) {
if (date >= yesterday) {
return Status::Error("Could not delete partitions with 2 days");
}
}
try {
auto yesterday = GetDateWithDelta(-1);
for (auto &date : dates) {
if (date >= yesterday) {
return Status::Error("Could not delete partitions with 2 days");
}
}
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
@ -157,40 +176,43 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
in(&TableFileSchema::date_, dates)
));
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when drop partition", e);
}
return Status::OK();
}
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
if (table_schema.table_id_ == "") {
NextTableId(table_schema.table_id_);
}
table_schema.files_cnt_ = 0;
table_schema.id_ = -1;
table_schema.created_on_ = utils::GetMicroSecTimeStamp();
auto start_time = METRICS_NOW_TIME;
{
try {
MetricCollector metric;
server::Metrics::GetInstance().MetaAccessTotalIncrement();
if (table_schema.table_id_ == "") {
NextTableId(table_schema.table_id_);
}
table_schema.files_cnt_ = 0;
table_schema.id_ = -1;
table_schema.created_on_ = utils::GetMicroSecTimeStamp();
try {
auto id = ConnectorPtr->insert(table_schema);
table_schema.id_ = id;
} catch (...) {
return Status::DBTransactionError("Add Table Error");
}
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
}
assert(ret);
}
assert(ret);
} catch (std::exception &e) {
return HandleException("Encounter exception when create table", e);
}
return Status::OK();
@ -198,14 +220,31 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
Status DBMetaImpl::DeleteTable(const std::string& table_id) {
try {
//drop the table from meta
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
MetricCollector metric;
//soft delete table
auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::files_cnt_,
&TableSchema::dimension_,
&TableSchema::engine_type_,
&TableSchema::store_raw_data_,
&TableSchema::created_on_),
where(c(&TableSchema::table_id_) == table_id));
for (auto &table : tables) {
ConnectorPtr->remove<TableSchema>(std::get<0>(table));
TableSchema table_schema;
table_schema.table_id_ = table_id;
table_schema.state_ = (int)TableSchema::TO_DELETE;
table_schema.id_ = std::get<0>(table);
table_schema.files_cnt_ = std::get<1>(table);
table_schema.dimension_ = std::get<2>(table);
table_schema.engine_type_ = std::get<3>(table);
table_schema.store_raw_data_ = std::get<4>(table);
table_schema.created_on_ = std::get<5>(table);
ConnectorPtr->update<TableSchema>(table_schema);
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when delete table", e);
}
return Status::OK();
@ -213,19 +252,17 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) {
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_,
&TableSchema::files_cnt_,
&TableSchema::dimension_,
&TableSchema::engine_type_,
&TableSchema::store_raw_data_),
where(c(&TableSchema::table_id_) == table_schema.table_id_));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
assert(groups.size() <= 1);
where(c(&TableSchema::table_id_) == table_schema.table_id_
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
if (groups.size() == 1) {
table_schema.id_ = std::get<0>(groups[0]);
table_schema.files_cnt_ = std::get<2>(groups[0]);
@ -240,47 +277,44 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
table_schema.location_ = table_path;
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when describe table", e);
}
return Status::OK();
}
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
has_or_not = false;
try {
MetricCollector metric;
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
where(c(&TableSchema::table_id_) == table_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
assert(tables.size() <= 1);
where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
if (tables.size() == 1) {
has_or_not = true;
} else {
has_or_not = false;
}
} catch (std::exception &e) {
HandleException(e);
HandleException("Encounter exception when lookup table", e);
}
return Status::OK();
}
Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_,
&TableSchema::files_cnt_,
&TableSchema::dimension_,
&TableSchema::engine_type_,
&TableSchema::store_raw_data_));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
&TableSchema::store_raw_data_),
where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
for (auto &table : selected) {
TableSchema schema;
schema.id_ = std::get<0>(table);
@ -292,8 +326,9 @@ Status DBMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
table_schema_array.emplace_back(schema);
}
} catch (std::exception &e) {
HandleException(e);
HandleException("Encounter exception when lookup all tables", e);
}
return Status::OK();
@ -310,37 +345,33 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
return status;
}
NextFileId(file_schema.file_id_);
file_schema.file_type_ = TableFileSchema::NEW;
file_schema.dimension_ = table_schema.dimension_;
file_schema.size_ = 0;
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
GetTableFilePath(file_schema);
ENGINE_LOG_DEBUG << "CreateTableFile " << file_schema.file_id_;
{
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
} catch (...) {
return Status::DBTransactionError("Add file Error");
}
}
try {
MetricCollector metric;
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);
NextFileId(file_schema.file_id_);
file_schema.file_type_ = TableFileSchema::NEW;
file_schema.dimension_ = table_schema.dimension_;
file_schema.size_ = 0;
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
GetTableFilePath(file_schema);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id;
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
}
assert(ret);
} catch (std::exception& ex) {
return HandleException("Encounter exception when create table file", ex);
}
return Status::OK();
@ -350,8 +381,8 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
@ -361,9 +392,6 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
&TableFileSchema::engine_type_),
where(c(&TableFileSchema::file_type_)
== (int) TableFileSchema::TO_INDEX));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
std::map<std::string, TableSchema> groups;
TableFileSchema table_file;
@ -391,8 +419,9 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
table_file.dimension_ = groups[table_file.table_id_].dimension_;
files.push_back(table_file);
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when iterate raw files", e);
}
return Status::OK();
@ -404,8 +433,8 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
MetricCollector metric;
if (partition.empty()) {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
@ -420,9 +449,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
== (int) TableFileSchema::TO_INDEX or
c(&TableFileSchema::file_type_)
== (int) TableFileSchema::INDEX)));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
@ -464,9 +491,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
== (int) TableFileSchema::TO_INDEX or
c(&TableFileSchema::file_type_)
== (int) TableFileSchema::INDEX)));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
@ -495,7 +520,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when iterate index files", e);
}
return Status::OK();
@ -506,8 +531,8 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
@ -517,9 +542,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and
c(&TableFileSchema::table_id_) == table_id),
order_by(&TableFileSchema::size_).desc());
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
@ -545,7 +568,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when iterate merge files", e);
}
return Status::OK();
@ -563,9 +586,7 @@ Status DBMetaImpl::FilesToDelete(const std::string& table_id,
&TableFileSchema::file_id_,
&TableFileSchema::size_,
&TableFileSchema::date_),
where(c(&TableFileSchema::file_type_) !=
(int) TableFileSchema::TO_DELETE
and c(&TableFileSchema::table_id_) == table_id));
where(c(&TableFileSchema::table_id_) == table_id));
//step 2: erase table files from meta
for (auto &file : selected) {
@ -592,9 +613,7 @@ Status DBMetaImpl::FilesToDelete(const std::string& table_id,
&TableFileSchema::file_id_,
&TableFileSchema::size_,
&TableFileSchema::date_),
where(c(&TableFileSchema::file_type_) !=
(int) TableFileSchema::TO_DELETE
and in(&TableFileSchema::date_, partition)
where(in(&TableFileSchema::date_, partition)
and c(&TableFileSchema::table_id_) == table_id));
//step 2: erase table files from meta
@ -617,7 +636,7 @@ Status DBMetaImpl::FilesToDelete(const std::string& table_id,
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when iterate delete files", e);
}
return Status::OK();
@ -648,7 +667,7 @@ Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
" File:" + file_schema.file_id_ + " not found");
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when lookup table file", e);
}
return Status::OK();
@ -677,7 +696,7 @@ Status DBMetaImpl::Archive() {
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when update table files", e);
}
}
if (criteria == "disk") {
@ -707,7 +726,7 @@ Status DBMetaImpl::Size(uint64_t &result) {
result += (uint64_t) (*std::get<0>(sub_query));
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when calculte db size", e);
}
return Status::OK();
@ -752,7 +771,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
));
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when discard table file", e);
}
return DiscardFiles(to_discard_size);
@ -761,38 +780,33 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
MetricCollector metric;
ConnectorPtr->update(file_schema);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
} catch (std::exception &e) {
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
HandleException(e);
return HandleException("Encounter exception when update table file", e);
}
return Status::OK();
}
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
MetricCollector metric;
auto commited = ConnectorPtr->transaction([&]() mutable {
for (auto &file : files) {
file.updated_time_ = utils::GetMicroSecTimeStamp();
ConnectorPtr->update(file);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
return true;
});
if (!commited) {
return Status::DBTransactionError("Update files Error");
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when update table files", e);
}
return Status::OK();
}
@ -830,7 +844,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
/* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when clean table files", e);
}
return Status::OK();
@ -868,7 +882,7 @@ Status DBMetaImpl::CleanUp() {
/* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when clean table file", e);
}
return Status::OK();
@ -877,9 +891,8 @@ Status DBMetaImpl::CleanUp() {
Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_,
&TableFileSchema::date_),
where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
@ -888,9 +901,7 @@ Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
and
c(&TableFileSchema::table_id_) == table_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
@ -908,7 +919,7 @@ Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
result /= sizeof(float);
} catch (std::exception &e) {
HandleException(e);
return HandleException("Encounter exception when calculate table file size", e);
}
return Status::OK();
}

View File

@ -1,87 +0,0 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <easylogging++.h>
#include <assert.h>
#include <atomic>
#include "Env.h"
namespace zilliz {
namespace milvus {
namespace engine {
Env::Env()
: bg_work_started_(false),
shutting_down_(false) {
}
void Env::Schedule(void (*function)(void* arg), void* arg) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_) return;
if (!bg_work_started_) {
bg_work_started_ = true;
std::thread bg_thread(Env::BackgroundThreadEntryPoint, this);
bg_thread.detach();
}
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
bg_work_queue_.emplace(function, arg);
}
void Env::BackgroundThreadMain() {
while (!shutting_down_) {
std::unique_lock<std::mutex> lock(bg_work_mutex_);
while (bg_work_queue_.empty() && !shutting_down_) {
bg_work_cv_.wait(lock);
}
if (shutting_down_) break;
assert(!bg_work_queue_.empty());
auto bg_function = bg_work_queue_.front().function_;
void* bg_arg = bg_work_queue_.front().arg_;
bg_work_queue_.pop();
lock.unlock();
bg_function(bg_arg);
}
std::unique_lock<std::mutex> lock(bg_work_mutex_);
bg_work_started_ = false;
bg_work_cv_.notify_all();
}
void Env::Stop() {
{
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (shutting_down_ || !bg_work_started_) return;
}
shutting_down_ = true;
{
std::unique_lock<std::mutex> lock(bg_work_mutex_);
if (bg_work_queue_.empty()) {
bg_work_cv_.notify_one();
}
while (bg_work_started_) {
bg_work_cv_.wait(lock);
}
}
shutting_down_ = false;
}
Env::~Env() {}
Env* Env::Default() {
static Env env;
return &env;
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -1,56 +0,0 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <condition_variable>
#include <thread>
#include <mutex>
#include <queue>
#include <atomic>
namespace zilliz {
namespace milvus {
namespace engine {
class Env {
public:
Env();
Env(const Env&) = delete;
Env& operator=(const Env&) = delete;
void Schedule(void (*function)(void* arg), void* arg);
virtual void Stop();
virtual ~Env();
static Env* Default();
protected:
void BackgroundThreadMain();
static void BackgroundThreadEntryPoint(Env* env) {
env->BackgroundThreadMain();
}
struct BGWork {
explicit BGWork(void (*function)(void*), void* arg)
: function_(function), arg_(arg) {}
void (* const function_)(void*);
void* const arg_;
};
std::mutex bg_work_mutex_;
std::condition_variable bg_work_cv_;
std::queue<BGWork> bg_work_queue_;
bool bg_work_started_;
std::atomic<bool> shutting_down_;
}; // Env
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -142,6 +142,13 @@ Status MemManager::Serialize(std::vector<std::string>& table_ids) {
return Status::OK();
}
Status MemManager::EraseMemVector(const std::string& table_id) {
std::unique_lock<std::mutex> lock(mutex_);
memMap_.erase(table_id);
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -75,6 +75,8 @@ public:
Status Serialize(std::vector<std::string>& table_ids);
Status EraseMemVector(const std::string& table_id);
private:
Status InsertVectorsNoLock(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);

View File

@ -21,12 +21,18 @@ const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
struct TableSchema {
size_t id_;
typedef enum {
NORMAL,
TO_DELETE,
} TABLE_STATE;
size_t id_ = 0;
std::string table_id_;
int state_ = (int)NORMAL;
size_t files_cnt_ = 0;
uint16_t dimension_;
uint16_t dimension_ = 0;
std::string location_;
long created_on_;
long created_on_ = 0;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
bool store_raw_data_ = false;
}; // TableSchema
@ -40,17 +46,17 @@ struct TableFileSchema {
TO_DELETE,
} FILE_TYPE;
size_t id_;
size_t id_ = 0;
std::string table_id_;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
std::string file_id_;
int file_type_ = NEW;
size_t size_;
size_t size_ = 0;
DateT date_ = EmptyDate;
uint16_t dimension_;
uint16_t dimension_ = 0;
std::string location_;
long updated_time_;
long created_on_;
long updated_time_ = 0;
long created_on_ = 0;
}; // TableFileSchema
typedef std::vector<TableFileSchema> TableFilesSchema;

View File

@ -9,7 +9,6 @@
#include <boost/algorithm/string.hpp>
#include "Options.h"
#include "Env.h"
#include "DBMetaImpl.h"
#include "Exception.h"
@ -17,8 +16,7 @@ namespace zilliz {
namespace milvus {
namespace engine {
Options::Options()
: env(Env::Default()) {
Options::Options() {
}
ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) {

View File

@ -47,7 +47,6 @@ struct Options {
uint16_t memory_sync_interval = 1; //unit: second
uint16_t merge_trigger_number = 2;
size_t index_trigger_size = ONE_GB; //unit: byte
Env* env;
DBMetaOptions meta;
}; // Options

View File

@ -20,7 +20,7 @@ namespace {
static constexpr int64_t TOTAL_ROW_COUNT = 100000;
static constexpr int64_t TOP_K = 10;
static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
static constexpr int64_t ADD_VECTOR_LOOP = 1;
static constexpr int64_t ADD_VECTOR_LOOP = 2;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
@ -179,8 +179,8 @@ ClientTest::Test(const std::string& address, const std::string& port) {
}
{//search vectors
std::cout << "Waiting data persist. Sleep 10 seconds ..." << std::endl;
sleep(10);
std::cout << "Waiting data persist. Sleep 1 seconds ..." << std::endl;
sleep(1);
std::vector<RowRecord> record_array;
BuildVectors(SEARCH_TARGET, SEARCH_TARGET + 10, record_array);

View File

@ -9,7 +9,6 @@
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "db/DB.h"
#include "db/Env.h"
#include "db/Meta.h"
#include "version.h"
@ -34,7 +33,6 @@ namespace {
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
opt.memory_sync_interval = (uint16_t)config.GetInt32Value(CONFIG_DB_FLUSH_INTERVAL, 10);
opt.meta.path = db_path + "/db";
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
if(index_size > 0) {//ensure larger than zero, unit is MB

View File

@ -23,7 +23,6 @@ static const std::string CONFIG_SERVER_MODE = "server_mode";
static const std::string CONFIG_DB = "db_config";
static const std::string CONFIG_DB_URL = "db_backend_url";
static const std::string CONFIG_DB_PATH = "db_path";
static const std::string CONFIG_DB_FLUSH_INTERVAL = "db_flush_interval";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_LOG = "log_config";