From 6a6673f1b93abc5820f10a1a1a863a8f8649f639 Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 18 Jun 2019 19:31:33 +0800 Subject: [PATCH] refine code Former-commit-id: af1b487c2c21aba34ac91b8f5df4ce97a5836919 --- cpp/conf/server_config.yaml | 1 - cpp/src/db/DB.cpp | 1 - cpp/src/db/DBImpl.cpp | 169 +++++----- cpp/src/db/DBImpl.h | 40 +-- cpp/src/db/DBMetaImpl.cpp | 299 +++++++++--------- cpp/src/db/Env.cpp | 87 ----- cpp/src/db/Env.h | 56 ---- cpp/src/db/MemManager.cpp | 7 + cpp/src/db/MemManager.h | 2 + cpp/src/db/MetaTypes.h | 22 +- cpp/src/db/Options.cpp | 4 +- cpp/src/db/Options.h | 1 - .../sdk/examples/simple/src/ClientTest.cpp | 6 +- cpp/src/server/RequestTask.cpp | 2 - cpp/src/server/ServerConfig.h | 1 - 15 files changed, 276 insertions(+), 422 deletions(-) delete mode 100644 cpp/src/db/Env.cpp delete mode 100644 cpp/src/db/Env.h diff --git a/cpp/conf/server_config.yaml b/cpp/conf/server_config.yaml index 612d6bb0d8..3d3a67f965 100644 --- a/cpp/conf/server_config.yaml +++ b/cpp/conf/server_config.yaml @@ -8,7 +8,6 @@ server_config: db_config: db_path: /tmp/milvus db_backend_url: http://127.0.0.1 - db_flush_interval: 5 #flush cache data into disk at intervals, unit: second index_building_threshold: 1024 #build index file when raw data file size larger than this value, unit: MB metric_config: diff --git a/cpp/src/db/DB.cpp b/cpp/src/db/DB.cpp index 31f14a90d0..ca08bbd9b4 100644 --- a/cpp/src/db/DB.cpp +++ b/cpp/src/db/DB.cpp @@ -6,7 +6,6 @@ #include "DBImpl.h" #include "DBMetaImpl.h" -#include "Env.h" #include "Factories.h" namespace zilliz { diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 69a76981b2..b29d2021e3 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -5,7 +5,6 @@ ******************************************************************************/ #include "DBImpl.h" #include "DBMetaImpl.h" -#include "Env.h" #include "Log.h" #include "EngineFactory.h" #include "metrics/Metrics.h" @@ -125,13 +124,12 @@ void CalcScore(uint64_t vector_count, DBImpl::DBImpl(const Options& options) - : env_(options.env), - options_(options), - bg_compaction_scheduled_(false), + : options_(options), shutting_down_(false), - bg_build_index_started_(false), pMeta_(new meta::DBMetaImpl(options_.meta)), - pMemMgr_(new MemManager(pMeta_, options_)) { + pMemMgr_(new MemManager(pMeta_, options_)), + compact_thread_pool_(1, 1), + index_thread_pool_(1, 1) { StartTimerTasks(options_.memory_sync_interval); } @@ -140,28 +138,10 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) { } Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) { - meta::DatePartionedTableFilesSchema files; - auto status = pMeta_->FilesToDelete(table_id, dates, files); - if (!status.ok()) { return status; } - - for (auto &day_files : files) { - for (auto &file : day_files.second) { - boost::filesystem::remove(file.location_); - } - } - //dates empty means delete all files of the table if(dates.empty()) { - meta::TableSchema table_schema; - table_schema.table_id_ = table_id; - status = DescribeTable(table_schema); - - pMeta_->DeleteTable(table_id); - boost::system::error_code ec; - boost::filesystem::remove_all(table_schema.location_, ec); - if(ec.failed()) { - ENGINE_LOG_WARNING << "Failed to remove table folder"; - } + pMemMgr_->EraseMemVector(table_id); //not allow insert + pMeta_->DeleteTable(table_id); //soft delete } return Status::OK(); @@ -405,7 +385,15 @@ void DBImpl::BackgroundTimerTask(int interval) { server::SystemInfo::GetInstance().Init(); while (true) { if (!bg_error_.ok()) break; - if (shutting_down_.load(std::memory_order_acquire)) break; + if (shutting_down_.load(std::memory_order_acquire)){ + for(auto& iter : compact_thread_results_) { + iter.wait(); + } + for(auto& iter : index_thread_results_) { + iter.wait(); + } + break; + } std::this_thread::sleep_for(std::chrono::seconds(interval)); @@ -421,33 +409,34 @@ void DBImpl::BackgroundTimerTask(int interval) { server::Metrics::GetInstance().GPUPercentGaugeSet(); server::Metrics::GetInstance().GPUMemoryUsageGaugeSet(); server::Metrics::GetInstance().OctetsSet(); - TrySchedule(); + + StartCompactionTask(); + StartBuildIndexTask(); } } -void DBImpl::TrySchedule() { - if (bg_compaction_scheduled_) return; - if (!bg_error_.ok()) return; +void DBImpl::StartCompactionTask() { + //serialize memory data + std::vector temp_table_ids; + pMemMgr_->Serialize(temp_table_ids); + for(auto& id : temp_table_ids) { + compact_table_ids_.insert(id); + } - bg_compaction_scheduled_ = true; - env_->Schedule(&DBImpl::BGWork, this); -} + //compactiong has been finished? + if(!compact_thread_results_.empty()) { + std::chrono::milliseconds span(10); + if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) { + compact_thread_results_.pop_back(); + } + } -void DBImpl::BGWork(void* db_) { - reinterpret_cast(db_)->BackgroundCall(); -} - -void DBImpl::BackgroundCall() { - std::lock_guard lock(mutex_); - assert(bg_compaction_scheduled_); - - if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire)) - return ; - - BackgroundCompaction(); - - bg_compaction_scheduled_ = false; - bg_work_finish_signal_.notify_all(); + //add new compaction task + if(compact_thread_results_.empty()) { + compact_thread_results_.push_back( + compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_)); + compact_table_ids_.clear(); + } } Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, @@ -512,7 +501,6 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { } bool has_merge = false; - for (auto& kv : raw_files) { auto files = kv.second; if (files.size() <= options_.merge_trigger_number) { @@ -520,15 +508,43 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { } has_merge = true; MergeFiles(table_id, kv.first, kv.second); + + if (shutting_down_.load(std::memory_order_acquire)){ + break; + } + } + + return Status::OK(); +} + +void DBImpl::BackgroundCompaction(std::set table_ids) { + Status status; + for (auto table_id : table_ids) { + status = BackgroundMergeFiles(table_id); + if (!status.ok()) { + bg_error_ = status; + return; + } } pMeta_->Archive(); - - TryBuildIndex(); - pMeta_->CleanUpFilesWithTTL(1); +} - return Status::OK(); +void DBImpl::StartBuildIndexTask() { + //build index has been finished? + if(!index_thread_results_.empty()) { + std::chrono::milliseconds span(10); + if (index_thread_results_.back().wait_for(span) == std::future_status::ready) { + index_thread_results_.pop_back(); + } + } + + //add new build index task + if(index_thread_results_.empty()) { + index_thread_results_.push_back( + index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this)); + } } Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { @@ -569,8 +585,6 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { } void DBImpl::BackgroundBuildIndex() { - std::lock_guard lock(build_index_mutex_); - assert(bg_build_index_started_); meta::TableFilesSchema to_index_files; pMeta_->FilesToIndex(to_index_files); Status status; @@ -581,34 +595,12 @@ void DBImpl::BackgroundBuildIndex() { bg_error_ = status; return; } - } - /* LOG(DEBUG) << "All Buiding index Done"; */ - bg_build_index_started_ = false; - bg_build_index_finish_signal_.notify_all(); -} - -Status DBImpl::TryBuildIndex() { - if (bg_build_index_started_) return Status::OK(); - if (shutting_down_.load(std::memory_order_acquire)) return Status::OK(); - bg_build_index_started_ = true; - std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this); - build_index_task.detach(); - return Status::OK(); -} - -void DBImpl::BackgroundCompaction() { - std::vector table_ids; - pMemMgr_->Serialize(table_ids); - - Status status; - for (auto table_id : table_ids) { - status = BackgroundMergeFiles(table_id); - if (!status.ok()) { - bg_error_ = status; - return; + if (shutting_down_.load(std::memory_order_acquire)){ + break; } } + /* LOG(DEBUG) << "All Buiding index Done"; */ } Status DBImpl::DropAll() { @@ -620,23 +612,10 @@ Status DBImpl::Size(uint64_t& result) { } DBImpl::~DBImpl() { - { - std::unique_lock lock(mutex_); - shutting_down_.store(true, std::memory_order_release); - while (bg_compaction_scheduled_) { - bg_work_finish_signal_.wait(lock); - } - } - { - std::unique_lock lock(build_index_mutex_); - while (bg_build_index_started_) { - bg_build_index_finish_signal_.wait(lock); - } - } + shutting_down_.store(true, std::memory_order_release); bg_timer_thread_.join(); std::vector ids; pMemMgr_->Serialize(ids); - env_->Stop(); } } // namespace engine diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 1a2f80601d..6457ed3e49 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -8,12 +8,15 @@ #include "DB.h" #include "MemManager.h" #include "Types.h" +#include "utils/ThreadPool.h" #include #include #include #include #include +#include +#include namespace zilliz { namespace milvus { @@ -62,40 +65,37 @@ private: const float* vectors, const meta::DatesT& dates, QueryResults& results); - void BackgroundBuildIndex(); - Status BuildIndex(const meta::TableFileSchema&); - Status TryBuildIndex(); - Status MergeFiles(const std::string& table_id, - const meta::DateT& date, - const meta::TableFilesSchema& files); - Status BackgroundMergeFiles(const std::string& table_id); - - void TrySchedule(); void StartTimerTasks(int interval); void BackgroundTimerTask(int interval); - static void BGWork(void* db); - void BackgroundCall(); - void BackgroundCompaction(); + void StartCompactionTask(); + Status MergeFiles(const std::string& table_id, + const meta::DateT& date, + const meta::TableFilesSchema& files); + Status BackgroundMergeFiles(const std::string& table_id); + void BackgroundCompaction(std::set table_ids); + + void StartBuildIndexTask(); + void BackgroundBuildIndex(); + Status BuildIndex(const meta::TableFileSchema&); - Env* const env_; const Options options_; - std::mutex mutex_; - std::condition_variable bg_work_finish_signal_; - bool bg_compaction_scheduled_; Status bg_error_; std::atomic shutting_down_; - std::mutex build_index_mutex_; - bool bg_build_index_started_; - std::condition_variable bg_build_index_finish_signal_; - std::thread bg_timer_thread_; MetaPtr pMeta_; MemManagerPtr pMemMgr_; + server::ThreadPool compact_thread_pool_; + std::list> compact_thread_results_; + std::set compact_table_ids_; + + server::ThreadPool index_thread_pool_; + std::list> index_thread_results_; + }; // DBImpl diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 7305060c0a..ecad07835e 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -29,24 +29,43 @@ using namespace sqlite_orm; namespace { -void HandleException(std::exception &e) { - ENGINE_LOG_DEBUG << "Engine meta exception: " << e.what(); - throw e; +Status HandleException(const std::string& desc, std::exception &e) { + ENGINE_LOG_ERROR << desc << ": " << e.what(); + return Status::DBTransactionError(desc, e.what()); } +class MetricCollector { +public: + MetricCollector() { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + start_time_ = METRICS_NOW_TIME; + } + + ~MetricCollector() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } + +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; +}; + } inline auto StoragePrototype(const std::string &path) { return make_storage(path, - make_table("Table", + make_table("Tables", make_column("id", &TableSchema::id_, primary_key()), make_column("table_id", &TableSchema::table_id_, unique()), + make_column("state", &TableSchema::state_), make_column("dimension", &TableSchema::dimension_), make_column("created_on", &TableSchema::created_on_), make_column("files_cnt", &TableSchema::files_cnt_, default_value(0)), make_column("engine_type", &TableSchema::engine_type_), make_column("store_raw_data", &TableSchema::store_raw_data_)), - make_table("TableFile", + make_table("TableFiles", make_column("id", &TableFileSchema::id_, primary_key()), make_column("table_id", &TableFileSchema::table_id_), make_column("engine_type", &TableFileSchema::engine_type_), @@ -109,9 +128,9 @@ Status DBMetaImpl::Initialize() { if (!boost::filesystem::is_directory(options_.path)) { auto ret = boost::filesystem::create_directory(options_.path); if (!ret) { - ENGINE_LOG_ERROR << "Create directory " << options_.path << " Error"; + ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path; + return Status::DBTransactionError("Failed to create db directory", options_.path); } - assert(ret); } ConnectorPtr = std::make_unique(StoragePrototype(options_.path + "/meta.sqlite")); @@ -139,15 +158,15 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, return status; } - auto yesterday = GetDateWithDelta(-1); - - for (auto &date : dates) { - if (date >= yesterday) { - return Status::Error("Could not delete partitions with 2 days"); - } - } - try { + auto yesterday = GetDateWithDelta(-1); + + for (auto &date : dates) { + if (date >= yesterday) { + return Status::Error("Could not delete partitions with 2 days"); + } + } + ConnectorPtr->update_all( set( c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE @@ -157,40 +176,43 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, in(&TableFileSchema::date_, dates) )); } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when drop partition", e); } + return Status::OK(); } Status DBMetaImpl::CreateTable(TableSchema &table_schema) { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - if (table_schema.table_id_ == "") { - NextTableId(table_schema.table_id_); - } - table_schema.files_cnt_ = 0; - table_schema.id_ = -1; - table_schema.created_on_ = utils::GetMicroSecTimeStamp(); - auto start_time = METRICS_NOW_TIME; - { + try { + MetricCollector metric; + + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + if (table_schema.table_id_ == "") { + NextTableId(table_schema.table_id_); + } + table_schema.files_cnt_ = 0; + table_schema.id_ = -1; + table_schema.created_on_ = utils::GetMicroSecTimeStamp(); + try { auto id = ConnectorPtr->insert(table_schema); table_schema.id_ = id; } catch (...) { return Status::DBTransactionError("Add Table Error"); } - } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - auto table_path = GetTablePath(table_schema.table_id_); - table_schema.location_ = table_path; - if (!boost::filesystem::is_directory(table_path)) { - auto ret = boost::filesystem::create_directories(table_path); - if (!ret) { - ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + if (!boost::filesystem::is_directory(table_path)) { + auto ret = boost::filesystem::create_directories(table_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + } + assert(ret); } - assert(ret); + + } catch (std::exception &e) { + return HandleException("Encounter exception when create table", e); } return Status::OK(); @@ -198,14 +220,31 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { Status DBMetaImpl::DeleteTable(const std::string& table_id) { try { - //drop the table from meta - auto tables = ConnectorPtr->select(columns(&TableSchema::id_), + MetricCollector metric; + + //soft delete table + auto tables = ConnectorPtr->select(columns(&TableSchema::id_, + &TableSchema::files_cnt_, + &TableSchema::dimension_, + &TableSchema::engine_type_, + &TableSchema::store_raw_data_, + &TableSchema::created_on_), where(c(&TableSchema::table_id_) == table_id)); for (auto &table : tables) { - ConnectorPtr->remove(std::get<0>(table)); + TableSchema table_schema; + table_schema.table_id_ = table_id; + table_schema.state_ = (int)TableSchema::TO_DELETE; + table_schema.id_ = std::get<0>(table); + table_schema.files_cnt_ = std::get<1>(table); + table_schema.dimension_ = std::get<2>(table); + table_schema.engine_type_ = std::get<3>(table); + table_schema.store_raw_data_ = std::get<4>(table); + table_schema.created_on_ = std::get<5>(table); + + ConnectorPtr->update(table_schema); } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when delete table", e); } return Status::OK(); @@ -213,19 +252,17 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) { Status DBMetaImpl::DescribeTable(TableSchema &table_schema) { try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; + auto groups = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::files_cnt_, &TableSchema::dimension_, &TableSchema::engine_type_, &TableSchema::store_raw_data_), - where(c(&TableSchema::table_id_) == table_schema.table_id_)); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - assert(groups.size() <= 1); + where(c(&TableSchema::table_id_) == table_schema.table_id_ + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); + if (groups.size() == 1) { table_schema.id_ = std::get<0>(groups[0]); table_schema.files_cnt_ = std::get<2>(groups[0]); @@ -240,47 +277,44 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) { table_schema.location_ = table_path; } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when describe table", e); } return Status::OK(); } Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { - try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + has_or_not = false; + try { + MetricCollector metric; auto tables = ConnectorPtr->select(columns(&TableSchema::id_), - where(c(&TableSchema::table_id_) == table_id)); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - assert(tables.size() <= 1); + where(c(&TableSchema::table_id_) == table_id + and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); if (tables.size() == 1) { has_or_not = true; } else { has_or_not = false; } + } catch (std::exception &e) { - HandleException(e); + HandleException("Encounter exception when lookup table", e); } + return Status::OK(); } Status DBMetaImpl::AllTables(std::vector& table_schema_array) { try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; + auto selected = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_, &TableSchema::files_cnt_, &TableSchema::dimension_, &TableSchema::engine_type_, - &TableSchema::store_raw_data_)); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + &TableSchema::store_raw_data_), + where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); for (auto &table : selected) { TableSchema schema; schema.id_ = std::get<0>(table); @@ -292,8 +326,9 @@ Status DBMetaImpl::AllTables(std::vector& table_schema_array) { table_schema_array.emplace_back(schema); } + } catch (std::exception &e) { - HandleException(e); + HandleException("Encounter exception when lookup all tables", e); } return Status::OK(); @@ -310,37 +345,33 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) { return status; } - NextFileId(file_schema.file_id_); - file_schema.file_type_ = TableFileSchema::NEW; - file_schema.dimension_ = table_schema.dimension_; - file_schema.size_ = 0; - file_schema.created_on_ = utils::GetMicroSecTimeStamp(); - file_schema.updated_time_ = file_schema.created_on_; - file_schema.engine_type_ = table_schema.engine_type_; - GetTableFilePath(file_schema); - ENGINE_LOG_DEBUG << "CreateTableFile " << file_schema.file_id_; - { - try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; - auto id = ConnectorPtr->insert(file_schema); - file_schema.id_ = id; - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - } catch (...) { - return Status::DBTransactionError("Add file Error"); - } - } + try { + MetricCollector metric; - auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); + NextFileId(file_schema.file_id_); + file_schema.file_type_ = TableFileSchema::NEW; + file_schema.dimension_ = table_schema.dimension_; + file_schema.size_ = 0; + file_schema.created_on_ = utils::GetMicroSecTimeStamp(); + file_schema.updated_time_ = file_schema.created_on_; + file_schema.engine_type_ = table_schema.engine_type_; + GetTableFilePath(file_schema); - if (!boost::filesystem::is_directory(partition_path)) { - auto ret = boost::filesystem::create_directory(partition_path); - if (!ret) { - ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; + auto id = ConnectorPtr->insert(file_schema); + file_schema.id_ = id; + + auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); + + if (!boost::filesystem::is_directory(partition_path)) { + auto ret = boost::filesystem::create_directory(partition_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; + return Status::DBTransactionError("Failed to create partition directory"); + } } - assert(ret); + + } catch (std::exception& ex) { + return HandleException("Encounter exception when create table file", ex); } return Status::OK(); @@ -350,8 +381,8 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) { files.clear(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, @@ -361,9 +392,6 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) { &TableFileSchema::engine_type_), where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX)); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); std::map groups; TableFileSchema table_file; @@ -391,8 +419,9 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) { table_file.dimension_ = groups[table_file.table_id_].dimension_; files.push_back(table_file); } + } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when iterate raw files", e); } return Status::OK(); @@ -404,8 +433,8 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, files.clear(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; + if (partition.empty()) { auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, @@ -420,9 +449,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, == (int) TableFileSchema::TO_INDEX or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX))); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); @@ -464,9 +491,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, == (int) TableFileSchema::TO_INDEX or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX))); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); @@ -495,7 +520,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when iterate index files", e); } return Status::OK(); @@ -506,8 +531,8 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id, files.clear(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, @@ -517,9 +542,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id, where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and c(&TableFileSchema::table_id_) == table_id), order_by(&TableFileSchema::size_).desc()); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); @@ -545,7 +568,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id, files[table_file.date_].push_back(table_file); } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when iterate merge files", e); } return Status::OK(); @@ -563,9 +586,7 @@ Status DBMetaImpl::FilesToDelete(const std::string& table_id, &TableFileSchema::file_id_, &TableFileSchema::size_, &TableFileSchema::date_), - where(c(&TableFileSchema::file_type_) != - (int) TableFileSchema::TO_DELETE - and c(&TableFileSchema::table_id_) == table_id)); + where(c(&TableFileSchema::table_id_) == table_id)); //step 2: erase table files from meta for (auto &file : selected) { @@ -592,9 +613,7 @@ Status DBMetaImpl::FilesToDelete(const std::string& table_id, &TableFileSchema::file_id_, &TableFileSchema::size_, &TableFileSchema::date_), - where(c(&TableFileSchema::file_type_) != - (int) TableFileSchema::TO_DELETE - and in(&TableFileSchema::date_, partition) + where(in(&TableFileSchema::date_, partition) and c(&TableFileSchema::table_id_) == table_id)); //step 2: erase table files from meta @@ -617,7 +636,7 @@ Status DBMetaImpl::FilesToDelete(const std::string& table_id, } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when iterate delete files", e); } return Status::OK(); @@ -648,7 +667,7 @@ Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) { " File:" + file_schema.file_id_ + " not found"); } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when lookup table file", e); } return Status::OK(); @@ -677,7 +696,7 @@ Status DBMetaImpl::Archive() { c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE )); } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when update table files", e); } } if (criteria == "disk") { @@ -707,7 +726,7 @@ Status DBMetaImpl::Size(uint64_t &result) { result += (uint64_t) (*std::get<0>(sub_query)); } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when calculte db size", e); } return Status::OK(); @@ -752,7 +771,7 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { )); } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when discard table file", e); } return DiscardFiles(to_discard_size); @@ -761,38 +780,33 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; + ConnectorPtr->update(file_schema); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } catch (std::exception &e) { ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; - HandleException(e); + return HandleException("Encounter exception when update table file", e); } return Status::OK(); } Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; + auto commited = ConnectorPtr->transaction([&]() mutable { for (auto &file : files) { file.updated_time_ = utils::GetMicroSecTimeStamp(); ConnectorPtr->update(file); } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); return true; }); if (!commited) { return Status::DBTransactionError("Update files Error"); } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when update table files", e); } return Status::OK(); } @@ -830,7 +844,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { /* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */ } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when clean table files", e); } return Status::OK(); @@ -868,7 +882,7 @@ Status DBMetaImpl::CleanUp() { /* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */ } } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when clean table file", e); } return Status::OK(); @@ -877,9 +891,8 @@ Status DBMetaImpl::CleanUp() { Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) { try { + MetricCollector metric; - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_, &TableFileSchema::date_), where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or @@ -888,9 +901,7 @@ Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) { c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX) and c(&TableFileSchema::table_id_) == table_id)); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); @@ -908,7 +919,7 @@ Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) { result /= sizeof(float); } catch (std::exception &e) { - HandleException(e); + return HandleException("Encounter exception when calculate table file size", e); } return Status::OK(); } diff --git a/cpp/src/db/Env.cpp b/cpp/src/db/Env.cpp deleted file mode 100644 index 2cca7da13b..0000000000 --- a/cpp/src/db/Env.cpp +++ /dev/null @@ -1,87 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ -#include -#include -#include -#include "Env.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -Env::Env() - : bg_work_started_(false), - shutting_down_(false) { -} - -void Env::Schedule(void (*function)(void* arg), void* arg) { - std::unique_lock lock(bg_work_mutex_); - if (shutting_down_) return; - - if (!bg_work_started_) { - bg_work_started_ = true; - std::thread bg_thread(Env::BackgroundThreadEntryPoint, this); - bg_thread.detach(); - } - - if (bg_work_queue_.empty()) { - bg_work_cv_.notify_one(); - } - - bg_work_queue_.emplace(function, arg); -} - -void Env::BackgroundThreadMain() { - while (!shutting_down_) { - std::unique_lock lock(bg_work_mutex_); - while (bg_work_queue_.empty() && !shutting_down_) { - bg_work_cv_.wait(lock); - } - - if (shutting_down_) break; - - assert(!bg_work_queue_.empty()); - auto bg_function = bg_work_queue_.front().function_; - void* bg_arg = bg_work_queue_.front().arg_; - bg_work_queue_.pop(); - - lock.unlock(); - bg_function(bg_arg); - } - - std::unique_lock lock(bg_work_mutex_); - bg_work_started_ = false; - bg_work_cv_.notify_all(); -} - -void Env::Stop() { - { - std::unique_lock lock(bg_work_mutex_); - if (shutting_down_ || !bg_work_started_) return; - } - shutting_down_ = true; - { - std::unique_lock lock(bg_work_mutex_); - if (bg_work_queue_.empty()) { - bg_work_cv_.notify_one(); - } - while (bg_work_started_) { - bg_work_cv_.wait(lock); - } - } - shutting_down_ = false; -} - -Env::~Env() {} - -Env* Env::Default() { - static Env env; - return &env; -} - -} // namespace engine -} // namespace milvus -} // namespace zilliz diff --git a/cpp/src/db/Env.h b/cpp/src/db/Env.h deleted file mode 100644 index 7818b25ef0..0000000000 --- a/cpp/src/db/Env.h +++ /dev/null @@ -1,56 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ -#pragma once - -#include -#include -#include -#include -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -class Env { -public: - Env(); - - Env(const Env&) = delete; - Env& operator=(const Env&) = delete; - - void Schedule(void (*function)(void* arg), void* arg); - - virtual void Stop(); - - virtual ~Env(); - - static Env* Default(); - -protected: - void BackgroundThreadMain(); - static void BackgroundThreadEntryPoint(Env* env) { - env->BackgroundThreadMain(); - } - - struct BGWork { - explicit BGWork(void (*function)(void*), void* arg) - : function_(function), arg_(arg) {} - - void (* const function_)(void*); - void* const arg_; - }; - - std::mutex bg_work_mutex_; - std::condition_variable bg_work_cv_; - std::queue bg_work_queue_; - bool bg_work_started_; - std::atomic shutting_down_; -}; // Env - -} // namespace engine -} // namespace milvus -} // namespace zilliz diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index c8bd5de23b..31b2070b49 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -142,6 +142,13 @@ Status MemManager::Serialize(std::vector& table_ids) { return Status::OK(); } +Status MemManager::EraseMemVector(const std::string& table_id) { + std::unique_lock lock(mutex_); + memMap_.erase(table_id); + + return Status::OK(); +} + } // namespace engine } // namespace milvus diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index 395c8f48cd..1b329f971b 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -75,6 +75,8 @@ public: Status Serialize(std::vector& table_ids); + Status EraseMemVector(const std::string& table_id); + private: Status InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors, IDNumbers& vector_ids); diff --git a/cpp/src/db/MetaTypes.h b/cpp/src/db/MetaTypes.h index a284d14a19..b20d3a05d0 100644 --- a/cpp/src/db/MetaTypes.h +++ b/cpp/src/db/MetaTypes.h @@ -21,12 +21,18 @@ const DateT EmptyDate = -1; typedef std::vector DatesT; struct TableSchema { - size_t id_; + typedef enum { + NORMAL, + TO_DELETE, + } TABLE_STATE; + + size_t id_ = 0; std::string table_id_; + int state_ = (int)NORMAL; size_t files_cnt_ = 0; - uint16_t dimension_; + uint16_t dimension_ = 0; std::string location_; - long created_on_; + long created_on_ = 0; int engine_type_ = (int)EngineType::FAISS_IDMAP; bool store_raw_data_ = false; }; // TableSchema @@ -40,17 +46,17 @@ struct TableFileSchema { TO_DELETE, } FILE_TYPE; - size_t id_; + size_t id_ = 0; std::string table_id_; int engine_type_ = (int)EngineType::FAISS_IDMAP; std::string file_id_; int file_type_ = NEW; - size_t size_; + size_t size_ = 0; DateT date_ = EmptyDate; - uint16_t dimension_; + uint16_t dimension_ = 0; std::string location_; - long updated_time_; - long created_on_; + long updated_time_ = 0; + long created_on_ = 0; }; // TableFileSchema typedef std::vector TableFilesSchema; diff --git a/cpp/src/db/Options.cpp b/cpp/src/db/Options.cpp index 74197226fe..ff6404751f 100644 --- a/cpp/src/db/Options.cpp +++ b/cpp/src/db/Options.cpp @@ -9,7 +9,6 @@ #include #include "Options.h" -#include "Env.h" #include "DBMetaImpl.h" #include "Exception.h" @@ -17,8 +16,7 @@ namespace zilliz { namespace milvus { namespace engine { -Options::Options() - : env(Env::Default()) { +Options::Options() { } ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) { diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 2a7740575e..339044d9dd 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -47,7 +47,6 @@ struct Options { uint16_t memory_sync_interval = 1; //unit: second uint16_t merge_trigger_number = 2; size_t index_trigger_size = ONE_GB; //unit: byte - Env* env; DBMetaOptions meta; }; // Options diff --git a/cpp/src/sdk/examples/simple/src/ClientTest.cpp b/cpp/src/sdk/examples/simple/src/ClientTest.cpp index 825923b1a4..b2ed9f3b25 100644 --- a/cpp/src/sdk/examples/simple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/simple/src/ClientTest.cpp @@ -20,7 +20,7 @@ namespace { static constexpr int64_t TOTAL_ROW_COUNT = 100000; static constexpr int64_t TOP_K = 10; static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different - static constexpr int64_t ADD_VECTOR_LOOP = 1; + static constexpr int64_t ADD_VECTOR_LOOP = 2; #define BLOCK_SPLITER std::cout << "===========================================" << std::endl; @@ -179,8 +179,8 @@ ClientTest::Test(const std::string& address, const std::string& port) { } {//search vectors - std::cout << "Waiting data persist. Sleep 10 seconds ..." << std::endl; - sleep(10); + std::cout << "Waiting data persist. Sleep 1 seconds ..." << std::endl; + sleep(1); std::vector record_array; BuildVectors(SEARCH_TARGET, SEARCH_TARGET + 10, record_array); diff --git a/cpp/src/server/RequestTask.cpp b/cpp/src/server/RequestTask.cpp index 6d2f5ae4c2..3b90d335ec 100644 --- a/cpp/src/server/RequestTask.cpp +++ b/cpp/src/server/RequestTask.cpp @@ -9,7 +9,6 @@ #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "db/DB.h" -#include "db/Env.h" #include "db/Meta.h" #include "version.h" @@ -34,7 +33,6 @@ namespace { ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL); std::string db_path = config.GetValue(CONFIG_DB_PATH); - opt.memory_sync_interval = (uint16_t)config.GetInt32Value(CONFIG_DB_FLUSH_INTERVAL, 10); opt.meta.path = db_path + "/db"; int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE); if(index_size > 0) {//ensure larger than zero, unit is MB diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index cd6a89bd70..dd7c9d2966 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -23,7 +23,6 @@ static const std::string CONFIG_SERVER_MODE = "server_mode"; static const std::string CONFIG_DB = "db_config"; static const std::string CONFIG_DB_URL = "db_backend_url"; static const std::string CONFIG_DB_PATH = "db_path"; -static const std::string CONFIG_DB_FLUSH_INTERVAL = "db_flush_interval"; static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold"; static const std::string CONFIG_LOG = "log_config";