From 1a29bb36e71070b88694ae1fa70823ad49b9fb9e Mon Sep 17 00:00:00 2001 From: zhiru Date: Sun, 23 Jun 2019 15:21:59 +0800 Subject: [PATCH] update Former-commit-id: e52522b110dadee29e9d1a0d13a677c0bb26e529 --- cpp/src/db/MySQLMetaImpl.cpp | 663 +++++++++++++------------ cpp/src/db/MySQLMetaImpl.h | 13 +- cpp/unittest/db/MySQLMetaImpl_test.cpp | 75 +-- cpp/unittest/db/db_tests.cpp | 20 +- 4 files changed, 421 insertions(+), 350 deletions(-) diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index 20734ba86f..e8c648c93a 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -30,7 +30,7 @@ namespace meta { using namespace mysqlpp; - std::unique_ptr connectionPtr(new Connection()); + static std::unique_ptr connectionPtr(new Connection()); std::recursive_mutex mysql_mutex; // // std::unique_ptr& MySQLMetaImpl::getConnectionPtr() { @@ -41,11 +41,29 @@ namespace meta { namespace { - void HandleException(std::exception &e) { - ENGINE_LOG_DEBUG << "Engine meta exception: " << e.what(); - throw e; + Status HandleException(const std::string& desc, std::exception &e) { + ENGINE_LOG_ERROR << desc << ": " << e.what(); + return Status::DBTransactionError(desc, e.what()); } + class MetricCollector { + public: + MetricCollector() { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + start_time_ = METRICS_NOW_TIME; + } + + ~MetricCollector() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } + + private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + }; + } std::string MySQLMetaImpl::GetTablePath(const std::string &table_id) { @@ -93,13 +111,12 @@ namespace meta { std::lock_guard lock(mysql_mutex); - std::string path = options_.path; - if (!boost::filesystem::is_directory(path)) { - auto ret = boost::filesystem::create_directory(path); + if (!boost::filesystem::is_directory(options_.path)) { + auto ret = boost::filesystem::create_directory(options_.path); if (!ret) { - ENGINE_LOG_ERROR << "Create directory " << path << " Error"; + ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path; + return Status::DBTransactionError("Failed to create db directory", options_.path); } - assert(ret); } std::string uri = options_.backend_uri; @@ -156,6 +173,7 @@ namespace meta { InitializeQuery << "CREATE TABLE IF NOT EXISTS meta (" << "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << "table_id VARCHAR(255) UNIQUE NOT NULL, " << + "state INT NOT NULL, " << "dimension SMALLINT NOT NULL, " << "created_on BIGINT NOT NULL, " << "files_cnt BIGINT DEFAULT 0 NOT NULL, " << @@ -226,16 +244,16 @@ namespace meta { return status; } - auto yesterday = GetDateWithDelta(-1); - - for (auto &date : dates) { - if (date >= yesterday) { - return Status::Error("Could not delete partitions within 2 days"); - } - } - try { + auto yesterday = GetDateWithDelta(-1); + + for (auto &date : dates) { + if (date >= yesterday) { + return Status::Error("Could not delete partitions within 2 days"); + } + } + Query dropPartitionsByDatesQuery = connectionPtr->query(); std::stringstream dateListSS; @@ -268,60 +286,80 @@ namespace meta { std::lock_guard lock(mysql_mutex); - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - if (table_schema.table_id_.empty()) { - NextTableId(table_schema.table_id_); - } - table_schema.files_cnt_ = 0; - table_schema.id_ = -1; - table_schema.created_on_ = utils::GetMicroSecTimeStamp(); - auto start_time = METRICS_NOW_TIME; - { - try { - Query createTableQuery = connectionPtr->query(); - std::string id = "NULL"; //auto-increment - std::string table_id = table_schema.table_id_; - std::string dimension = std::to_string(table_schema.dimension_); - std::string created_on = std::to_string(table_schema.created_on_); - std::string files_cnt = "0"; - std::string engine_type = std::to_string(table_schema.engine_type_); - std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false"; - createTableQuery << "INSERT INTO meta VALUES" << - "(" << id << ", " << quote << table_id << ", " << dimension << ", " << - created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data - << ");"; - if (SimpleResult res = createTableQuery.execute()) { - table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? +// server::Metrics::GetInstance().MetaAccessTotalIncrement(); + try { + + MetricCollector metric; + + Query createTableQuery = connectionPtr->query(); + + if (table_schema.table_id_.empty()) { + NextTableId(table_schema.table_id_); + } + else { + createTableQuery << "SELECT state FROM meta " << + "WHERE table_id = " << quote << table_schema.table_id_ << ";"; + StoreQueryResult res = createTableQuery.store(); + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + std::string msg = (TableSchema::TO_DELETE == state) ? + "Table already exists" : "Table already exists and it is in delete state, please wait a second"; + return Status::Error(msg); + } + } + + table_schema.files_cnt_ = 0; + table_schema.id_ = -1; + table_schema.created_on_ = utils::GetMicroSecTimeStamp(); + +// auto start_time = METRICS_NOW_TIME; + + std::string id = "NULL"; //auto-increment + std::string table_id = table_schema.table_id_; + std::string state = std::to_string(table_schema.state_); + std::string dimension = std::to_string(table_schema.dimension_); + std::string created_on = std::to_string(table_schema.created_on_); + std::string files_cnt = "0"; + std::string engine_type = std::to_string(table_schema.engine_type_); + std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false"; + + createTableQuery << "INSERT INTO meta VALUES" << + "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " << + created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data + << ");"; + + if (SimpleResult res = createTableQuery.execute()) { + table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? // std::cout << table_schema.id_ << std::endl; - //Consume all results to avoid "Commands out of sync" error - while (createTableQuery.more_results()) { - createTableQuery.store_next(); - } - } - else { - return Status::DBTransactionError("Add Table Error", createTableQuery.error()); - } - - } catch (const BadQuery& er) { - // Handle any query errors - return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE", er.what()); - } catch (const Exception& er) { - // Catch-all for any other MySQL++ exceptions - return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what()); + //Consume all results to avoid "Commands out of sync" error +// while (createTableQuery.more_results()) { +// createTableQuery.store_next(); +// } } - } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - - auto table_path = GetTablePath(table_schema.table_id_); - table_schema.location_ = table_path; - if (!boost::filesystem::is_directory(table_path)) { - auto ret = boost::filesystem::create_directories(table_path); - if (!ret) { - ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + else { + return Status::DBTransactionError("Add Table Error", createTableQuery.error()); } - assert(ret); + +// auto end_time = METRICS_NOW_TIME; +// auto total_time = METRICS_MICROSECONDS(start_time, end_time); +// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + if (!boost::filesystem::is_directory(table_path)) { + auto ret = boost::filesystem::create_directories(table_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + return Status::Error("Failed to create table path"); + } + } + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what()); } return Status::OK(); @@ -332,14 +370,18 @@ namespace meta { std::lock_guard lock(mysql_mutex); try { - //drop the table from meta + + MetricCollector metric; + + //soft delete table Query deleteTableQuery = connectionPtr->query(); - deleteTableQuery << "DELETE FROM meta WHERE table_id = " << quote << table_id << ";"; - if (deleteTableQuery.exec()) { - return Status::OK(); - } - else { - return Status::DBTransactionError("Delete Table Error", deleteTableQuery.error()); +// + deleteTableQuery << "UPDATE meta " << + "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << ";"; + + if (!deleteTableQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error()); } } catch (const BadQuery& er) { // Handle any query errors @@ -348,6 +390,35 @@ namespace meta { // Catch-all for any other MySQL++ exceptions return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE", er.what()); } + + return Status::OK(); + } + + Status MySQLMetaImpl::DeleteTableFiles(const std::string& table_id) { + try { + MetricCollector metric; + + //soft delete table files + Query deleteTableFilesQuery = connectionPtr->query(); + // + deleteTableFilesQuery << "UPDATE metaFile " << + "SET state = " << std::to_string(TableSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " + "WHERE table_id = " << quote << table_id << ";"; + + if (!deleteTableFilesQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE FILES", er.what()); + } + + return Status::OK(); } Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { @@ -355,48 +426,26 @@ namespace meta { std::lock_guard lock(mysql_mutex); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + + MetricCollector metric; Query describeTableQuery = connectionPtr->query(); - describeTableQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << + describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " << "FROM meta " << - "WHERE table_id = " << quote << table_schema.table_id_ << ";"; + "WHERE table_id = " << quote << table_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; StoreQueryResult res = describeTableQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - -// if (!res) { -// return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING TABLE", describeTableQuery.error()); -// } assert(res && res.num_rows() <= 1); if (res.num_rows() == 1) { const Row& resRow = res[0]; -// std::string id; -// resRow["id"].to_string(id); -// table_schema.id_ = std::stoul(id); table_schema.id_ = resRow["id"]; //implicit conversion - std::string table_id; - resRow["table_id"].to_string(table_id); - table_schema.table_id_ = table_id; - -// std::string created_on; -// resRow["created_on"].to_string(created_on); -// table_schema.created_on_ = std::stol(created_on); table_schema.dimension_ = resRow["dimension"]; -// std::string files_cnt; -// resRow["files_cnt"].to_string(files_cnt); -// table_schema.files_cnt_ = std::stoul(files_cnt); table_schema.files_cnt_ = resRow["files_cnt"]; -// std::string engine_type; -// resRow["engine_type"].to_string(engine_type); -// table_schema.engine_type_ = std::stoi(engine_type); table_schema.engine_type_ = resRow["engine_type"]; table_schema.store_raw_data_ = (resRow["store_raw_data"].compare("true") == 0); @@ -424,19 +473,20 @@ namespace meta { std::lock_guard lock(mysql_mutex); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + + MetricCollector metric; + + Query hasTableQuery = connectionPtr->query(); //since table_id is a unique column we just need to check whether it exists or not - hasTableQuery << "SELECT EXISTS (SELECT 1 FROM meta WHERE table_id = " << quote << table_id << ") " - << "AS " << quote << "check" << ";"; + hasTableQuery << "SELECT EXISTS " << + "(SELECT 1 FROM meta " << + "WHERE table_id = " << quote << table_id << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; StoreQueryResult res = hasTableQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - assert(res && res.num_rows() == 1); int check = res[0]["check"]; has_or_not = (check == 1); @@ -457,18 +507,15 @@ namespace meta { std::lock_guard lock(mysql_mutex); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + + MetricCollector metric; Query allTablesQuery = connectionPtr->query(); allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << - "FROM meta;"; + "FROM meta " << + "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; StoreQueryResult res = allTablesQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - for (auto& resRow : res) { TableSchema table_schema; @@ -513,68 +560,63 @@ namespace meta { return status; } - NextFileId(file_schema.file_id_); - file_schema.file_type_ = TableFileSchema::NEW; - file_schema.dimension_ = table_schema.dimension_; - file_schema.size_ = 0; - file_schema.created_on_ = utils::GetMicroSecTimeStamp(); - file_schema.updated_time_ = file_schema.created_on_; - file_schema.engine_type_ = table_schema.engine_type_; - GetTableFilePath(file_schema); + try { - { - try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; - Query createTableFileQuery = connectionPtr->query(); - std::string id = "NULL"; //auto-increment - std::string table_id = file_schema.table_id_; - std::string engine_type = std::to_string(file_schema.engine_type_); - std::string file_id = file_schema.file_id_; - std::string file_type = std::to_string(file_schema.file_type_); - std::string size = std::to_string(file_schema.size_); - std::string updated_time = std::to_string(file_schema.updated_time_); - std::string created_on = std::to_string(file_schema.created_on_); - std::string date = std::to_string(file_schema.date_); + NextFileId(file_schema.file_id_); + file_schema.file_type_ = TableFileSchema::NEW; + file_schema.dimension_ = table_schema.dimension_; + file_schema.size_ = 0; + file_schema.created_on_ = utils::GetMicroSecTimeStamp(); + file_schema.updated_time_ = file_schema.created_on_; + file_schema.engine_type_ = table_schema.engine_type_; + GetTableFilePath(file_schema); - createTableFileQuery << "INSERT INTO metaFile VALUES" << - "(" << id << ", " << quote << table_id << ", " << engine_type << ", " << - quote << file_id << ", " << file_type << ", " << size << ", " << - updated_time << ", " << created_on << ", " << date << ");"; + Query createTableFileQuery = connectionPtr->query(); + std::string id = "NULL"; //auto-increment + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); - if (SimpleResult res = createTableFileQuery.execute()) { - file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? + createTableFileQuery << "INSERT INTO metaFile VALUES" << + "(" << id << ", " << quote << table_id << ", " << engine_type << ", " << + quote << file_id << ", " << file_type << ", " << size << ", " << + updated_time << ", " << created_on << ", " << date << ");"; - //Consume all results to avoid "Commands out of sync" error - while (createTableFileQuery.more_results()) { - createTableFileQuery.store_next(); - } - } - else { - return Status::DBTransactionError("Add file Error", createTableFileQuery.error()); - } + if (SimpleResult res = createTableFileQuery.execute()) { + file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - } catch (const BadQuery& er) { - // Handle any query errors - return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE FILE", er.what()); - } catch (const Exception& er) { - // Catch-all for any other MySQL++ exceptions - return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what()); + //Consume all results to avoid "Commands out of sync" error +// while (createTableFileQuery.more_results()) { +// createTableFileQuery.store_next(); +// } } - } - - auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); - - if (!boost::filesystem::is_directory(partition_path)) { - auto ret = boost::filesystem::create_directory(partition_path); - if (!ret) { - ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; + else { + return Status::DBTransactionError("Add file Error", createTableFileQuery.error()); } - assert(ret); + + auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); + + if (!boost::filesystem::is_directory(partition_path)) { + auto ret = boost::filesystem::create_directory(partition_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; + return Status::DBTransactionError("Failed to create partition directory"); + } + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what()); } return Status::OK(); @@ -587,8 +629,8 @@ namespace meta { files.clear(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + + MetricCollector metric; Query filesToIndexQuery = connectionPtr->query(); filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << @@ -596,10 +638,6 @@ namespace meta { "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";"; StoreQueryResult res = filesToIndexQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - std::map groups; TableFileSchema table_file; for (auto& resRow : res) { @@ -659,8 +697,8 @@ namespace meta { files.clear(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + + MetricCollector metric; StoreQueryResult res; @@ -675,9 +713,6 @@ namespace meta { "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; res = filesToSearchQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); } else { @@ -691,17 +726,14 @@ namespace meta { partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << - "FROM metaFile " << - "WHERE table_id = " << quote << table_id << " AND " << - "date IN (" << partitionListStr << ") AND " << - "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << - "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << - "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + "FROM metaFile " << + "WHERE table_id = " << quote << table_id << " AND " << + "date IN (" << partitionListStr << ") AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; res = filesToSearchQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); } TableSchema table_schema; @@ -762,20 +794,16 @@ namespace meta { files.clear(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; Query filesToMergeQuery = connectionPtr->query(); filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << "FROM metaFile " << "WHERE table_id = " << quote << table_id << " AND " << - "file_type = " << std::to_string(TableFileSchema::RAW) << ";"; + "file_type = " << std::to_string(TableFileSchema::RAW) << " " << + "ORDER BY size DESC" << ";"; StoreQueryResult res = filesToMergeQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); @@ -826,40 +854,45 @@ namespace meta { return Status::OK(); } - //ZR: TODO: this function is pending to be removed, so not gonna implemented for now - Status MySQLMetaImpl::FilesToDelete(const std::string& table_id, - const DatesT& partition, - DatePartionedTableFilesSchema& files) { - -// std::lock_guard lock(mysql_mutex); - - return Status::OK(); - } - - Status MySQLMetaImpl::GetTableFile(TableFileSchema &file_schema) { + Status MySQLMetaImpl::GetTableFiles(const std::string& table_id, + const std::vector& ids, + TableFilesSchema& table_files) { std::lock_guard lock(mysql_mutex); + std::stringstream idSS; + for (auto& id : ids) { + idSS << "id = " << std::to_string(id) << " OR "; + } + std::string idStr = idSS.str(); + idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR " + try { Query getTableFileQuery = connectionPtr->query(); - getTableFileQuery << "SELECT id, table_id, file_id, file_type, size, date " << + getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " << "FROM metaFile " << - "WHERE file_id = " << quote << file_schema.file_id_ << " AND " << - "table_id = " << quote << file_schema.table_id_ << ";"; + "WHERE table_id = " << quote << table_id << " AND " << + "(" << idStr << ");"; StoreQueryResult res = getTableFileQuery.store(); - assert(res && res.num_rows() <= 1); - if (res.num_rows() == 1) { + assert(res); - const Row& resRow = res[0]; + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } - file_schema.id_ = resRow["id"]; //implicit conversion + for (auto& resRow : res) { + + TableFileSchema file_schema; - std::string table_id; - resRow["table_id"].to_string(table_id); file_schema.table_id_ = table_id; + file_schema.engine_type_ = resRow["engine_type"]; + std::string file_id; resRow["file_id"].to_string(file_id); file_schema.file_id_ = file_id; @@ -869,17 +902,19 @@ namespace meta { file_schema.size_ = resRow["size"]; file_schema.date_ = resRow["date"]; - } - else { - return Status::NotFound("Table:" + file_schema.table_id_ + - " File:" + file_schema.file_id_ + " not found"); + + file_schema.dimension_ = table_schema.dimension_; + + GetTableFilePath(file_schema); + + table_files.emplace_back(file_schema); } } catch (const BadQuery& er) { // Handle any query errors - return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING TABLE FILE", er.what()); + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING TABLE FILES", er.what()); } catch (const Exception& er) { // Catch-all for any other MySQL++ exceptions - return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING TABLE FILE", er.what()); + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING TABLE FILES", er.what()); } return Status::OK(); @@ -924,7 +959,7 @@ namespace meta { uint64_t sum = 0; Size(sum); - long long to_delete = (sum - limit * G); + auto to_delete = (sum - limit * G); DiscardFiles(to_delete); } } @@ -974,13 +1009,16 @@ namespace meta { std::lock_guard lock(mysql_mutex); - LOG(DEBUG) << "About to discard size=" << to_discard_size; if (to_discard_size <= 0) { // std::cout << "in" << std::endl; return Status::OK(); } + ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; + try { + MetricCollector metric; + Query discardFilesQuery = connectionPtr->query(); discardFilesQuery << "SELECT id, size " << "FROM metaFile " << @@ -1013,7 +1051,8 @@ namespace meta { idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR " discardFilesQuery << "UPDATE metaFile " << - "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << "WHERE " << idsToDiscardStr << ";"; if (discardFilesQuery.exec()) { @@ -1039,11 +1078,27 @@ namespace meta { file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + + MetricCollector metric; Query updateTableFileQuery = connectionPtr->query(); + //if the table has been deleted, just mark the table file as TO_DELETE + //clean thread will delete the file later + updateTableFileQuery << "SELECT state FROM meta " << + "WHERE table_id = " << quote << file_schema.table_id_ << ";"; + StoreQueryResult res = updateTableFileQuery.store(); + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + if (state == TableSchema::TO_DELETE) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + } + else { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + std::string id = std::to_string(file_schema.id_); std::string table_id = file_schema.table_id_; std::string engine_type = std::to_string(file_schema.engine_type_); @@ -1072,9 +1127,6 @@ namespace meta { return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error()); } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); } catch (const BadQuery& er) { // Handle any query errors ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; @@ -1092,13 +1144,36 @@ namespace meta { std::lock_guard lock(mysql_mutex); try { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; Query updateTableFilesQuery = connectionPtr->query(); + std::map has_tables; + for (auto &file_schema : files) { + + if(has_tables.find(file_schema.table_id_) != has_tables.end()) { + continue; + } + + updateTableFilesQuery << "SELECT EXISTS " << + "(SELECT 1 FROM meta " << + "WHERE table_id = " << quote << file_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; + StoreQueryResult res = updateTableFilesQuery.store(); + + assert(res && res.num_rows() == 1); + int check = res[0]["check"]; + has_tables[file_schema.table_id_] = (check == 1); + } + for (auto& file_schema : files) { + if(!has_tables[file_schema.table_id_]) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); + std::string id = std::to_string(file_schema.id_); std::string table_id = file_schema.table_id_; std::string engine_type = std::to_string(file_schema.engine_type_); @@ -1142,12 +1217,13 @@ namespace meta { auto now = utils::GetMicroSecTimeStamp(); try { + MetricCollector metric; Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, file_type, size, date " << + cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " << "FROM metaFile " << "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " << - "updated_time > " << std::to_string(now - seconds * US_PS) << ";"; + "updated_time < " << std::to_string(now - seconds * US_PS) << ";"; StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); assert(res); @@ -1167,17 +1243,12 @@ namespace meta { resRow["file_id"].to_string(file_id); table_file.file_id_ = file_id; - table_file.file_type_ = resRow["file_type"]; - - table_file.size_ = resRow["size"]; - table_file.date_ = resRow["date"]; GetTableFilePath(table_file); - if (table_file.file_type_ == TableFileSchema::TO_DELETE) { - boost::filesystem::remove(table_file.location_); - } + ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl; + boost::filesystem::remove(table_file.location_); idsToDelete.emplace_back(std::to_string(table_file.id_)); } @@ -1202,6 +1273,45 @@ namespace meta { return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); } + try { + MetricCollector metric; + + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id " << + "FROM meta " << + "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + assert(res); + std::stringstream idsToDeleteSS; + for (auto& resRow : res) { + size_t id = resRow["id"]; + std::string table_id; + resRow["table_id"].to_string(table_id); + + auto table_path = GetTablePath(table_id); + + ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; + boost::filesystem::remove_all(table_path); + + idsToDeleteSS << "id = " << std::to_string(id) << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM meta WHERE " << + idsToDeleteStr << ";"; + if (!cleanUpFilesWithTTLQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", cleanUpFilesWithTTLQuery.error()); + } + + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } + return Status::OK(); } @@ -1210,65 +1320,10 @@ namespace meta { std::lock_guard lock(mysql_mutex); try { -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::file_type_, -// &TableFileSchema::size_, -// &TableFileSchema::date_), -// where( -// c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE -// or -// c(&TableFileSchema::file_type_) -// == (int) TableFileSchema::NEW)); - + ENGINE_LOG_DEBUG << "Remove table file type as NEW"; Query cleanUpQuery = connectionPtr->query(); - cleanUpQuery << "SELECT id, table_id, file_id, file_type, size, date " << - "FROM metaFile " << - "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " OR " << - "file_type = " << std::to_string(TableFileSchema::NEW) << ";"; - StoreQueryResult res = cleanUpQuery.store(); + cleanUpQuery << "DELETE FROM metaFile WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; - assert(res); - - TableFileSchema table_file; - std::vector idsToDelete; - - for (auto& resRow : res) { - - table_file.id_ = resRow["id"]; //implicit conversion - - std::string table_id; - resRow["table_id"].to_string(table_id); - table_file.table_id_ = table_id; - - std::string file_id; - resRow["file_id"].to_string(file_id); - table_file.file_id_ = file_id; - - table_file.file_type_ = resRow["file_type"]; - - table_file.size_ = resRow["size"]; - - table_file.date_ = resRow["date"]; - - GetTableFilePath(table_file); - - if (table_file.file_type_ == TableFileSchema::TO_DELETE) { - boost::filesystem::remove(table_file.location_); - } - - idsToDelete.emplace_back(std::to_string(table_file.id_)); - } - - std::stringstream idsToDeleteSS; - for (auto& id : idsToDelete) { - idsToDeleteSS << "id = " << id << " OR "; - } - std::string idsToDeleteStr = idsToDeleteSS.str(); - idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " - cleanUpQuery << "DELETE FROM metaFile WHERE " << - idsToDeleteStr << ";"; if (!cleanUpQuery.exec()) { return Status::DBTransactionError("Clean up Error", cleanUpQuery.error()); } @@ -1289,9 +1344,7 @@ namespace meta { std::lock_guard lock(mysql_mutex); try { - - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - auto start_time = METRICS_NOW_TIME; + MetricCollector metric; Query countQuery = connectionPtr->query(); countQuery << "SELECT size " << @@ -1302,10 +1355,6 @@ namespace meta { "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; StoreQueryResult res = countQuery.store(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); diff --git a/cpp/src/db/MySQLMetaImpl.h b/cpp/src/db/MySQLMetaImpl.h index 8dd2f4cabf..f151c2f928 100644 --- a/cpp/src/db/MySQLMetaImpl.h +++ b/cpp/src/db/MySQLMetaImpl.h @@ -24,16 +24,20 @@ namespace meta { MySQLMetaImpl(const DBMetaOptions& options_); virtual Status CreateTable(TableSchema& table_schema) override; - virtual Status DeleteTable(const std::string& table_id) override; virtual Status DescribeTable(TableSchema& group_info_) override; virtual Status HasTable(const std::string& table_id, bool& has_or_not) override; virtual Status AllTables(std::vector& table_schema_array) override; + virtual Status DeleteTable(const std::string& table_id) override; + virtual Status DeleteTableFiles(const std::string& table_id) override; + virtual Status CreateTableFile(TableFileSchema& file_schema) override; virtual Status DropPartitionsByDates(const std::string& table_id, const DatesT& dates) override; - virtual Status GetTableFile(TableFileSchema& file_schema) override; + virtual Status GetTableFiles(const std::string& table_id, + const std::vector& ids, + TableFilesSchema& table_files) override; virtual Status UpdateTableFile(TableFileSchema& file_schema) override; @@ -46,10 +50,6 @@ namespace meta { virtual Status FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) override; - virtual Status FilesToDelete(const std::string& table_id, - const DatesT& partition, - DatePartionedTableFilesSchema& files) override; - virtual Status FilesToIndex(TableFilesSchema&) override; virtual Status Archive() override; @@ -74,7 +74,6 @@ namespace meta { std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date); void GetTableFilePath(TableFileSchema& group_file); Status Initialize(); - std::unique_ptr& getConnectionPtr(); const DBMetaOptions options_; diff --git a/cpp/unittest/db/MySQLMetaImpl_test.cpp b/cpp/unittest/db/MySQLMetaImpl_test.cpp index c7fc611d7b..ed41df65d7 100644 --- a/cpp/unittest/db/MySQLMetaImpl_test.cpp +++ b/cpp/unittest/db/MySQLMetaImpl_test.cpp @@ -142,23 +142,30 @@ TEST_F(MySQLTest, core) { ASSERT_EQ(fileToMerge.table_id_, "test1"); ASSERT_EQ(fileToMerge.dimension_, 123); - meta::TableFileSchema resultTableFileSchema; - resultTableFileSchema.table_id_ = tableFileSchema.table_id_; - resultTableFileSchema.file_id_ = tableFileSchema.file_id_; - status = impl.GetTableFile(resultTableFileSchema); + meta::TableFilesSchema resultTableFilesSchema; + std::vector ids; + ids.push_back(tableFileSchema.id_); + status = impl.GetTableFiles(tableFileSchema.table_id_, ids, resultTableFilesSchema); ASSERT_TRUE(status.ok()); - ASSERT_EQ(resultTableFileSchema.id_, tableFileSchema.id_); + ASSERT_EQ(resultTableFilesSchema.size(), 1); + meta::TableFileSchema resultTableFileSchema = resultTableFilesSchema[0]; +// ASSERT_EQ(resultTableFileSchema.id_, tableFileSchema.id_); ASSERT_EQ(resultTableFileSchema.table_id_, tableFileSchema.table_id_); ASSERT_EQ(resultTableFileSchema.file_id_, tableFileSchema.file_id_); ASSERT_EQ(resultTableFileSchema.file_type_, tableFileSchema.file_type_); ASSERT_EQ(resultTableFileSchema.size_, tableFileSchema.size_); ASSERT_EQ(resultTableFileSchema.date_, tableFileSchema.date_); + ASSERT_EQ(resultTableFileSchema.engine_type_, tableFileSchema.engine_type_); + ASSERT_EQ(resultTableFileSchema.dimension_, tableFileSchema.dimension_); tableFileSchema.size_ = 234; - status = impl.CreateTable(schema2); + meta::TableSchema schema3; + schema3.table_id_ = "test3"; + schema3.dimension_ = 321; + status = impl.CreateTable(schema3); ASSERT_TRUE(status.ok()); meta::TableFileSchema tableFileSchema2; - tableFileSchema2.table_id_ = "test2"; + tableFileSchema2.table_id_ = "test3"; tableFileSchema2.size_ = 345; status = impl.CreateTableFile(tableFileSchema2); ASSERT_TRUE(status.ok()); @@ -184,18 +191,14 @@ TEST_F(MySQLTest, core) { } TEST_F(MySQLTest, GROUP_TEST) { - -// DBMetaOptions options; -// options.backend_uri = "mysql://root:1234@:/test"; -// options.path = "/tmp/vecwise_test"; - meta::MySQLMetaImpl impl(getDBMetaOptions()); + meta::MySQLMetaImpl impl(getDBMetaOptions()); + auto table_id = "meta_test_group"; meta::TableSchema group; group.table_id_ = table_id; auto status = impl.CreateTable(group); -// std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); auto gid = group.id_; @@ -219,9 +222,6 @@ TEST_F(MySQLTest, GROUP_TEST) { TEST_F(MySQLTest, table_file_TEST) { -// DBMetaOptions options; -// options.backend_uri = "mysql://root:1234@:/test"; -// options.path = "/tmp/vecwise_test"; meta::MySQLMetaImpl impl(getDBMetaOptions()); auto table_id = "meta_test_group"; @@ -267,9 +267,13 @@ TEST_F(MySQLTest, table_file_TEST) { dates.push_back(table_file.date_); status = impl.DropPartitionsByDates(table_file.table_id_, dates); ASSERT_TRUE(status.ok()); - status = impl.GetTableFile(table_file); + + std::vector ids = {table_file.id_}; + meta::TableFilesSchema files; + status = impl.GetTableFiles(table_file.table_id_, ids, files); ASSERT_TRUE(status.ok()); - ASSERT_TRUE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE); + ASSERT_EQ(files.size(), 1UL); + ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE); status = impl.DropAll(); ASSERT_TRUE(status.ok()); @@ -278,12 +282,10 @@ TEST_F(MySQLTest, table_file_TEST) { TEST_F(MySQLTest, ARCHIVE_TEST_DAYS) { srand(time(0)); DBMetaOptions options = getDBMetaOptions(); -// options.path = "/tmp/vecwise_test"; int days_num = rand() % 100; std::stringstream ss; ss << "days:" << days_num; options.archive_conf = ArchiveConf("delete", ss.str()); -// options.backend_uri = "mysql://root:1234@:/test"; meta::MySQLMetaImpl impl(options); @@ -300,6 +302,7 @@ TEST_F(MySQLTest, ARCHIVE_TEST_DAYS) { auto cnt = 100; long ts = utils::GetMicroSecTimeStamp(); std::vector days; + std::vector ids; for (auto i=0; i ids; for (auto i=0; iDropAll(); + ASSERT_TRUE(stat.ok()); }; TEST_F(MySQLDBTest, DB_TEST) { @@ -367,6 +370,9 @@ TEST_F(MySQLDBTest, DB_TEST) { } search.join(); + + stat = db_->DropAll(); + ASSERT_TRUE(stat.ok()); }; TEST_F(MySQLDBTest, SEARCH_TEST) { @@ -423,6 +429,9 @@ TEST_F(MySQLDBTest, SEARCH_TEST) { stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results); ASSERT_STATS(stat); + stat = db_->DropAll(); + ASSERT_TRUE(stat.ok()); + // TODO(linxj): add groundTruth assert }; @@ -462,6 +471,9 @@ TEST_F(MySQLDBTest, ARHIVE_DISK_CHECK) { db_->Size(size); LOG(DEBUG) << "size=" << size; ASSERT_LE(size, 1 * engine::meta::G); + + stat = db_->DropAll(); + ASSERT_TRUE(stat.ok()); }; TEST_F(MySQLDBTest, DELETE_TEST) { @@ -497,7 +509,13 @@ TEST_F(MySQLDBTest, DELETE_TEST) { std::vector dates; stat = db_->DeleteTable(TABLE_NAME, dates); - std::this_thread::sleep_for(std::chrono::seconds(2)); + + std::this_thread::sleep_for(std::chrono::seconds(10)); //change to 10 to make sure files are discarded + ASSERT_TRUE(stat.ok()); +// std::cout << table_info_get.location_ << std::endl; ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_)); + + stat = db_->DropAll(); + ASSERT_TRUE(stat.ok()); };