diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 1ebe05eb21..e88a80fb33 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -40,6 +40,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-85 - add NetIO metric - MS-96 - add new query interface for specified files - MS-97 - Add S3 SDK for MinIO Storage +- MS-105 - Add MySQL ## Task - MS-74 - Change README.md in cpp diff --git a/cpp/README.md b/cpp/README.md index 0c7706df23..94bf2241ec 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -1,13 +1,22 @@ ### Compilation #### Step 1: install necessery tools + Install MySQL + centos7 : - yum install gfortran flex bison + yum install gfortran qt4 flex bison mysql-devel ubuntu16.04 : - sudo apt-get install gfortran flex bison + sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev + +If `libmysqlclient_r.so` does not exist after installing MySQL Development Files, you need to create a symbolic link: + +``` +sudo ln -s /path/to/libmysqlclient.so /path/to/libmysqlclient_r.so +``` #### Step 2: build(output to cmake_build folder) + cmake_build/src/milvus_server is the server cmake_build/src/libmilvus_engine.a is the static library diff --git a/cpp/cmake/DefineOptions.cmake b/cpp/cmake/DefineOptions.cmake index d95e7c7ed1..147663d0db 100644 --- a/cpp/cmake/DefineOptions.cmake +++ b/cpp/cmake/DefineOptions.cmake @@ -93,6 +93,8 @@ define_option(MILVUS_WITH_SQLITE "Build with SQLite library" ON) define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON) +define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON) + define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON) define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON) diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index cb5f3532fe..99f34682e1 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -26,6 +26,7 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES JSONCONS LAPACK Lz4 + MySQLPP OpenBLAS Prometheus RocksDB @@ -56,12 +57,14 @@ macro(build_dependency DEPENDENCY_NAME) build_easyloggingpp() elseif("${DEPENDENCY_NAME}" STREQUAL "FAISS") build_faiss() + elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest") + build_gtest() elseif("${DEPENDENCY_NAME}" STREQUAL "LAPACK") build_lapack() elseif("${DEPENDENCY_NAME}" STREQUAL "Lz4") build_lz4() - elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest") - build_gtest() + elseif ("${DEPENDENCY_NAME}" STREQUAL "MySQLPP") + build_mysqlpp() elseif ("${DEPENDENCY_NAME}" STREQUAL "JSONCONS") build_jsoncons() elseif ("${DEPENDENCY_NAME}" STREQUAL "OpenBLAS") @@ -265,6 +268,12 @@ else() set(LZ4_SOURCE_URL "https://github.com/lz4/lz4/archive/${LZ4_VERSION}.tar.gz") endif() +if(DEFINED ENV{MILVUS_MYSQLPP_URL}) + set(MYSQLPP_SOURCE_URL "$ENV{MILVUS_MYSQLPP_URL}") +else() + set(MYSQLPP_SOURCE_URL "https://tangentsoft.com/mysqlpp/releases/mysql++-${MYSQLPP_VERSION}.tar.gz") +endif() + if (DEFINED ENV{MILVUS_OPENBLAS_URL}) set(OPENBLAS_SOURCE_URL "$ENV{MILVUS_OPENBLAS_URL}") else () @@ -829,8 +838,8 @@ macro(build_faiss) # ${MAKE} ${MAKE_BUILD_ARGS} BUILD_COMMAND ${MAKE} ${MAKE_BUILD_ARGS} all - COMMAND - cd gpu && make ${MAKE_BUILD_ARGS} + COMMAND + cd gpu && ${MAKE} ${MAKE_BUILD_ARGS} BUILD_IN_SOURCE 1 # INSTALL_DIR @@ -1068,6 +1077,56 @@ if(MILVUS_WITH_LZ4) include_directories(SYSTEM ${LZ4_INCLUDE_DIR}) endif() +# ---------------------------------------------------------------------- +# MySQL++ + +macro(build_mysqlpp) + message(STATUS "Building MySQL++-${MYSQLPP_VERSION} from source") + set(MYSQLPP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep") + set(MYSQLPP_INCLUDE_DIR "${MYSQLPP_PREFIX}/include") + set(MYSQLPP_SHARED_LIB + "${MYSQLPP_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}") + + set(MYSQLPP_CONFIGURE_ARGS + "--prefix=${MYSQLPP_PREFIX}" + "--enable-thread-check" + "CFLAGS=${EP_C_FLAGS}" + "CXXFLAGS=${EP_CXX_FLAGS}" + "LDFLAGS=-pthread") + + externalproject_add(mysqlpp_ep + URL + ${MYSQLPP_SOURCE_URL} + ${EP_LOG_OPTIONS} + CONFIGURE_COMMAND + "./configure" + ${MYSQLPP_CONFIGURE_ARGS} + BUILD_COMMAND + ${MAKE} ${MAKE_BUILD_ARGS} + BUILD_IN_SOURCE 1 + BUILD_BYPRODUCTS + ${MYSQLPP_SHARED_LIB}) + + file(MAKE_DIRECTORY "${MYSQLPP_INCLUDE_DIR}") + add_library(mysqlpp SHARED IMPORTED) + set_target_properties( + mysqlpp + PROPERTIES + IMPORTED_LOCATION "${MYSQLPP_SHARED_LIB}" + INTERFACE_INCLUDE_DIRECTORIES "${MYSQLPP_INCLUDE_DIR}") + + add_dependencies(mysqlpp mysqlpp_ep) + +endmacro() + +if(MILVUS_WITH_MYSQLPP) + + resolve_dependency(MySQLPP) + get_target_property(MYSQLPP_INCLUDE_DIR mysqlpp INTERFACE_INCLUDE_DIRECTORIES) + include_directories(SYSTEM "${MYSQLPP_INCLUDE_DIR}") + link_directories(SYSTEM ${MYSQLPP_PREFIX}/lib) +endif() + # ---------------------------------------------------------------------- # Prometheus diff --git a/cpp/conf/server_config.yaml b/cpp/conf/server_config.yaml index 717e9b10cb..05705bcc4c 100644 --- a/cpp/conf/server_config.yaml +++ b/cpp/conf/server_config.yaml @@ -8,7 +8,9 @@ server_config: db_config: db_path: /tmp/milvus - db_backend_url: http://127.0.0.1 + #URI format: dialect://username:password@host:port/database + #All parts except dialect are optional, but you MUST include the delimiters + db_backend_url: sqlite://:@:/ index_building_threshold: 1024 #build index file when raw data file size larger than this value, unit: MB metric_config: diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index e00420b2d1..fbed2a5e3e 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -62,6 +62,7 @@ set(s3_client_files include_directories(/usr/include) include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include") include_directories(thrift/gen-cpp) +include_directories(/usr/include/mysql) set(third_party_libs easyloggingpp @@ -83,6 +84,7 @@ set(third_party_libs snappy zlib zstd + mysqlpp ${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so ) if (MEGASEARCH_WITH_ARROW STREQUAL "ON") diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 7bf2f31211..279c2a5636 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -136,7 +136,8 @@ DBImpl::DBImpl(const Options& options) compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = DBMetaImplFactory::Build(options.meta); - mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_)); + mem_mgr_ = std::make_shared(meta_ptr_, options_); + // mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_)); StartTimerTasks(); } @@ -466,9 +467,14 @@ void DBImpl::StartMetricTask() { } void DBImpl::StartCompactionTask() { +// static int count = 0; +// count++; +// std::cout << "StartCompactionTask: " << count << std::endl; +// std::cout << "c: " << count++ << std::endl; static uint64_t compact_clock_tick = 0; compact_clock_tick++; if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) { +// std::cout << "c r: " << count++ << std::endl; return; } @@ -574,6 +580,10 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { } void DBImpl::BackgroundCompaction(std::set table_ids) { +// static int b_count = 0; +// b_count++; +// std::cout << "BackgroundCompaction: " << b_count << std::endl; + Status status; for (auto table_id : table_ids) { status = BackgroundMergeFiles(table_id); diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index 58265cf82f..fd005d1ff5 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -56,11 +56,11 @@ std::shared_ptr DBMetaImplFactory::Build() { std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOptions) { std::string uri = metaOptions.backend_uri; - if (uri.empty()) { - //Default to sqlite if uri is empty -// return std::make_shared(new meta::DBMetaImpl(metaOptions)); - return std::shared_ptr(new meta::DBMetaImpl(metaOptions)); - } +// if (uri.empty()) { +// //Default to sqlite if uri is empty +//// return std::make_shared(new meta::DBMetaImpl(metaOptions)); +// return std::shared_ptr(new meta::DBMetaImpl(metaOptions)); +// } std::string dialectRegex = "(.*)"; std::string usernameRegex = "(.*)"; @@ -81,12 +81,10 @@ std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOp std::string dialect = pieces_match[1].str(); std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); if (dialect.find("mysql") != std::string::npos) { -// return std::make_shared(new meta::MySQLMetaImpl(metaOptions)); - return std::shared_ptr(new meta::MySQLMetaImpl(metaOptions)); + return std::make_shared(meta::MySQLMetaImpl(metaOptions)); } else if (dialect.find("sqlite") != std::string::npos) { -// return std::make_shared(new meta::DBMetaImpl(metaOptions)); - return std::shared_ptr(new meta::DBMetaImpl(metaOptions)); + return std::make_shared(meta::DBMetaImpl(metaOptions)); } else { LOG(ERROR) << "Invalid dialect in URI: dialect = " << dialect; diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index e8c648c93a..32ca318448 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "mysql++/mysql++.h" @@ -30,8 +31,8 @@ namespace meta { using namespace mysqlpp; - static std::unique_ptr connectionPtr(new Connection()); - std::recursive_mutex mysql_mutex; +// static std::unique_ptr connectionPtr(new Connection()); +// std::recursive_mutex mysql_mutex; // // std::unique_ptr& MySQLMetaImpl::getConnectionPtr() { //// static std::recursive_mutex connectionMutex_; @@ -109,7 +110,7 @@ namespace meta { Status MySQLMetaImpl::Initialize() { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); if (!boost::filesystem::is_directory(options_.path)) { auto ret = boost::filesystem::create_directory(options_.path); @@ -153,12 +154,19 @@ namespace meta { //std::cout << dbName << " " << serverAddress << " " << username << " " << password << " " << port << std::endl; // connectionPtr->set_option(new MultiStatementsOption(true)); // connectionPtr->set_option(new mysqlpp::ReconnectOption(true)); - connectionPtr->set_option(new mysqlpp::ReconnectOption(true)); - std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl; + int threadHint = std::thread::hardware_concurrency(); + int maxPoolSize = threadHint == 0 ? 8 : threadHint; + mySQLConnectionPool_ = std::make_shared(dbName, username, password, serverAddress, port, maxPoolSize); +// std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl; try { - if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) { - return Status::Error("DB connection failed: ", connectionPtr->error()); + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) { +// return Status::Error("DB connection failed: ", connectionPtr->error()); +// } + if (!connectionPtr->thread_aware()) { + ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it."; + return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it."); } CleanUp(); @@ -220,9 +228,12 @@ namespace meta { } catch (const Exception& er) { // Catch-all for any other MySQL++ exceptions return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION", er.what()); + } catch (std::exception &e) { + return HandleException("Encounter exception during initialization", e); } } else { + ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri; return Status::Error("Wrong URI format"); } } @@ -231,7 +242,7 @@ namespace meta { Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, const DatesT &dates) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); if (dates.size() == 0) { return Status::OK(); @@ -246,6 +257,8 @@ namespace meta { try { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + auto yesterday = GetDateWithDelta(-1); for (auto &date : dates) { @@ -284,13 +297,15 @@ namespace meta { Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); // server::Metrics::GetInstance().MetaAccessTotalIncrement(); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query createTableQuery = connectionPtr->query(); if (table_schema.table_id_.empty()) { @@ -304,7 +319,7 @@ namespace meta { if (res.num_rows() == 1) { int state = res[0]["state"]; std::string msg = (TableSchema::TO_DELETE == state) ? - "Table already exists" : "Table already exists and it is in delete state, please wait a second"; + "Table already exists and it is in delete state, please wait a second" : "Table already exists"; return Status::Error(msg); } } @@ -360,6 +375,8 @@ namespace meta { } catch (const Exception& er) { // Catch-all for any other MySQL++ exceptions return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what()); + } catch (std::exception &e) { + return HandleException("Encounter exception when create table", e); } return Status::OK(); @@ -367,12 +384,14 @@ namespace meta { Status MySQLMetaImpl::DeleteTable(const std::string& table_id) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + //soft delete table Query deleteTableQuery = connectionPtr->query(); // @@ -398,6 +417,8 @@ namespace meta { try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + //soft delete table files Query deleteTableFilesQuery = connectionPtr->query(); // @@ -423,12 +444,14 @@ namespace meta { Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query describeTableQuery = connectionPtr->query(); describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " << "FROM meta " << @@ -470,13 +493,13 @@ namespace meta { Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); try { MetricCollector metric; - + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); Query hasTableQuery = connectionPtr->query(); //since table_id is a unique column we just need to check whether it exists or not @@ -504,12 +527,14 @@ namespace meta { Status MySQLMetaImpl::AllTables(std::vector& table_schema_array) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query allTablesQuery = connectionPtr->query(); allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << "FROM meta " << @@ -548,7 +573,7 @@ namespace meta { Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); if (file_schema.date_ == EmptyDate) { file_schema.date_ = Meta::GetDate(); @@ -564,6 +589,8 @@ namespace meta { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + NextFileId(file_schema.file_id_); file_schema.file_type_ = TableFileSchema::NEW; file_schema.dimension_ = table_schema.dimension_; @@ -617,6 +644,8 @@ namespace meta { } catch (const Exception& er) { // Catch-all for any other MySQL++ exceptions return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what()); + } catch (std::exception& ex) { + return HandleException("Encounter exception when create table file", ex); } return Status::OK(); @@ -624,7 +653,7 @@ namespace meta { Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); files.clear(); @@ -632,6 +661,8 @@ namespace meta { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query filesToIndexQuery = connectionPtr->query(); filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << "FROM metaFile " << @@ -692,7 +723,7 @@ namespace meta { const DatesT &partition, DatePartionedTableFilesSchema &files) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); files.clear(); @@ -700,6 +731,8 @@ namespace meta { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; if (partition.empty()) { @@ -789,13 +822,15 @@ namespace meta { Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); files.clear(); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query filesToMergeQuery = connectionPtr->query(); filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << "FROM metaFile " << @@ -858,7 +893,7 @@ namespace meta { const std::vector& ids, TableFilesSchema& table_files) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); std::stringstream idSS; for (auto& id : ids) { @@ -869,6 +904,8 @@ namespace meta { try { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query getTableFileQuery = connectionPtr->query(); getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " << "FROM metaFile " << @@ -923,7 +960,7 @@ namespace meta { // PXU TODO: Support Swap Status MySQLMetaImpl::Archive() { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); auto &criterias = options_.archive_conf.GetCriterias(); if (criterias.empty()) { @@ -936,8 +973,11 @@ namespace meta { if (criteria == "days") { size_t usecs = limit * D_SEC * US_PS; long now = utils::GetMicroSecTimeStamp(); + try { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query archiveQuery = connectionPtr->query(); archiveQuery << "UPDATE metaFile " << "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << @@ -969,11 +1009,13 @@ namespace meta { Status MySQLMetaImpl::Size(uint64_t &result) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); result = 0; try { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query getSizeQuery = connectionPtr->query(); getSizeQuery << "SELECT SUM(size) AS sum " << "FROM metaFile " << @@ -1007,7 +1049,7 @@ namespace meta { Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); if (to_discard_size <= 0) { // std::cout << "in" << std::endl; @@ -1019,6 +1061,8 @@ namespace meta { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query discardFilesQuery = connectionPtr->query(); discardFilesQuery << "SELECT id, size " << "FROM metaFile " << @@ -1074,13 +1118,15 @@ namespace meta { //ZR: this function assumes all fields in file_schema have value Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query updateTableFileQuery = connectionPtr->query(); //if the table has been deleted, just mark the table file as TO_DELETE @@ -1141,11 +1187,13 @@ namespace meta { Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query updateTableFilesQuery = connectionPtr->query(); std::map has_tables; @@ -1212,13 +1260,17 @@ namespace meta { } Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { - - std::lock_guard lock(mysql_mutex); +// static int b_count = 0; +// b_count++; +// std::cout << "CleanUpFilesWithTTL: " << b_count << std::endl; +// std::lock_guard lock(mysql_mutex); auto now = utils::GetMicroSecTimeStamp(); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " << "FROM metaFile " << @@ -1276,12 +1328,15 @@ namespace meta { try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); cleanUpFilesWithTTLQuery << "SELECT id, table_id " << "FROM meta " << "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); assert(res); +// std::cout << res.num_rows() << std::endl; std::stringstream idsToDeleteSS; for (auto& resRow : res) { size_t id = resRow["id"]; @@ -1317,9 +1372,11 @@ namespace meta { Status MySQLMetaImpl::CleanUp() { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); try { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + ENGINE_LOG_DEBUG << "Remove table file type as NEW"; Query cleanUpQuery = connectionPtr->query(); cleanUpQuery << "DELETE FROM metaFile WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; @@ -1341,11 +1398,13 @@ namespace meta { Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); try { MetricCollector metric; + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query countQuery = connectionPtr->query(); countQuery << "SELECT size " << "FROM metaFile " << @@ -1385,12 +1444,15 @@ namespace meta { Status MySQLMetaImpl::DropAll() { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); if (boost::filesystem::is_directory(options_.path)) { boost::filesystem::remove_all(options_.path); } try { + + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + Query dropTableQuery = connectionPtr->query(); dropTableQuery << "DROP TABLE IF EXISTS meta, metaFile;"; if (dropTableQuery.exec()) { @@ -1406,10 +1468,11 @@ namespace meta { // Catch-all for any other MySQL++ exceptions return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what()); } + return Status::OK(); } MySQLMetaImpl::~MySQLMetaImpl() { - std::lock_guard lock(mysql_mutex); +// std::lock_guard lock(mysql_mutex); CleanUp(); } diff --git a/cpp/src/db/MySQLMetaImpl.h b/cpp/src/db/MySQLMetaImpl.h index f151c2f928..033fa99453 100644 --- a/cpp/src/db/MySQLMetaImpl.h +++ b/cpp/src/db/MySQLMetaImpl.h @@ -7,6 +7,7 @@ #include "Meta.h" #include "Options.h" +#include "MySQLConnectionPool.h" #include "mysql++/mysql++.h" #include @@ -77,6 +78,9 @@ namespace meta { const DBMetaOptions options_; + std::shared_ptr mySQLConnectionPool_; + bool safe_grab = false; + // std::mutex connectionMutex_; }; // DBMetaImpl diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 269d81a498..311760948d 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -7,6 +7,7 @@ GTEST_VERSION=1.8.1 JSONCONS_VERSION=0.126.0 LAPACK_VERSION=v3.8.0 LZ4_VERSION=v1.9.1 +MYSQLPP_VERSION=3.2.4 OPENBLAS_VERSION=v0.3.6 PROMETHEUS_VERSION=v0.7.0 ROCKSDB_VERSION=v6.0.2 diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 1fa7b52ebd..0d69aba803 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -20,10 +20,10 @@ set(db_scheduler_srcs include_directories(/usr/local/cuda/include) link_directories("/usr/local/cuda/lib64") - +include_directories(/usr/include/mysql) set(db_test_src - ${unittest_srcs} + #${unittest_srcs} ${config_files} ${cache_srcs} ${db_srcs} @@ -35,8 +35,6 @@ set(db_test_src db_tests.cpp meta_tests.cpp) -include_directories(/usr/include/mysql) - cuda_add_executable(db_test ${db_test_src}) set(db_libs @@ -48,8 +46,9 @@ set(db_libs boost_system boost_filesystem lz4 + mysqlpp ) -target_link_libraries(db_test ${db_libs} ${unittest_libs} /usr/local/lib/libmysqlpp.so) +target_link_libraries(db_test ${db_libs} ${unittest_libs}) install(TARGETS db_test DESTINATION bin) diff --git a/cpp/unittest/db/MySQLMetaImpl_test.cpp b/cpp/unittest/db/MySQLMetaImpl_test.cpp index ed41df65d7..57fad28468 100644 --- a/cpp/unittest/db/MySQLMetaImpl_test.cpp +++ b/cpp/unittest/db/MySQLMetaImpl_test.cpp @@ -233,6 +233,7 @@ TEST_F(MySQLTest, table_file_TEST) { meta::TableFileSchema table_file; table_file.table_id_ = group.table_id_; status = impl.CreateTableFile(table_file); +// std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index ba58142860..b1738abcdf 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -286,9 +286,6 @@ TEST_F(DBTest2, DELETE_TEST) { std::this_thread::sleep_for(std::chrono::seconds(2)); ASSERT_TRUE(stat.ok()); ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_)); - - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); }; TEST_F(MySQLDBTest, DB_TEST) { @@ -371,8 +368,11 @@ TEST_F(MySQLDBTest, DB_TEST) { search.join(); - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; }; TEST_F(MySQLDBTest, SEARCH_TEST) { @@ -429,8 +429,11 @@ TEST_F(MySQLDBTest, SEARCH_TEST) { stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results); ASSERT_STATS(stat); - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; // TODO(linxj): add groundTruth assert }; @@ -466,14 +469,17 @@ TEST_F(MySQLDBTest, ARHIVE_DISK_CHECK) { std::this_thread::sleep_for(std::chrono::microseconds(1)); } - std::this_thread::sleep_for(std::chrono::seconds(10)); //change to 10 to make sure files are discarded + std::this_thread::sleep_for(std::chrono::seconds(1)); db_->Size(size); LOG(DEBUG) << "size=" << size; ASSERT_LE(size, 1 * engine::meta::G); - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; }; TEST_F(MySQLDBTest, DELETE_TEST) { @@ -484,12 +490,14 @@ TEST_F(MySQLDBTest, DELETE_TEST) { engine::meta::TableSchema table_info = BuildTableSchema(); engine::Status stat = db_->CreateTable(table_info); +// std::cout << stat.ToString() << std::endl; engine::meta::TableSchema table_info_get; table_info_get.table_id_ = TABLE_NAME; stat = db_->DescribeTable(table_info_get); ASSERT_STATS(stat); +// std::cout << "location: " << table_info_get.location_ << std::endl; ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_)); engine::IDNumbers vector_ids; @@ -509,13 +517,15 @@ TEST_F(MySQLDBTest, DELETE_TEST) { std::vector dates; stat = db_->DeleteTable(TABLE_NAME, dates); - - std::this_thread::sleep_for(std::chrono::seconds(10)); //change to 10 to make sure files are discarded - +// std::cout << "5 sec start" << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(5)); +// std::cout << "5 sec finish" << std::endl; ASSERT_TRUE(stat.ok()); -// std::cout << table_info_get.location_ << std::endl; - ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_)); +// ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_)); - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; }; diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 36a457e173..1f2ec5399c 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -13,8 +13,27 @@ #include "db/Factories.h" #include "db/Options.h" +INITIALIZE_EASYLOGGINGPP + using namespace zilliz::milvus; +static std::string uri; + +class DBTestEnvironment : public ::testing::Environment { +public: + +// explicit DBTestEnvironment(std::string uri) : uri_(uri) {} + + static std::string getURI() { + return uri; + } + + void SetUp() override { + getURI(); + } + +}; + void ASSERT_STATS(engine::Status& stat) { ASSERT_TRUE(stat.ok()); if(!stat.ok()) { @@ -69,14 +88,23 @@ zilliz::milvus::engine::DBMetaOptions MySQLTest::getDBMetaOptions() { // engine::DBMetaOptions options = engine::DBMetaOptionsFactory::Build(path); zilliz::milvus::engine::DBMetaOptions options; options.path = "/tmp/milvus_test"; - options.backend_uri = "mysql://root:1234@:/test"; + options.backend_uri = DBTestEnvironment::getURI(); return options; - } zilliz::milvus::engine::Options MySQLDBTest::GetOptions() { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; - options.meta.backend_uri = "mysql://root:1234@:/test"; + options.meta.backend_uri = DBTestEnvironment::getURI(); return options; } + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + if (argc > 1) { + uri = argv[1]; + } +// std::cout << uri << std::endl; + ::testing::AddGlobalTestEnvironment(new DBTestEnvironment); + return RUN_ALL_TESTS(); +}