diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 23409def53..ac3b2c51f6 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -59,7 +59,9 @@ Please mark all change in change log and use the ticket from JIRA. - MS-85 - add NetIO metric - MS-96 - add new query interface for specified files - MS-97 - Add S3 SDK for MinIO Storage +- MS-105 - Add MySQL - MS-130 - Add prometheus_test + ## Task - MS-74 - Change README.md in cpp - MS-88 - Add support for arm architecture diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c3a5957045..947759f793 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -113,20 +113,13 @@ link_directories(${MILVUS_BINARY_DIR}) set(MILVUS_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include) set(MILVUS_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src) -#set(MILVUS_THIRD_PARTY ${CMAKE_CURRENT_SOURCE_DIR}/third_party) -#set(MILVUS_THIRD_PARTY_BUILD ${CMAKE_CURRENT_SOURCE_DIR}/third_party/build) add_compile_definitions(PROFILER=${PROFILER}) include_directories(${MILVUS_ENGINE_INCLUDE}) include_directories(${MILVUS_ENGINE_SRC}) -#include_directories(${MILVUS_THIRD_PARTY_BUILD}/include) link_directories(${CMAKE_CURRRENT_BINARY_DIR}) -#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib) -#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib64) -#execute_process(COMMAND bash build.sh -# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party) add_subdirectory(src) diff --git a/cpp/README.md b/cpp/README.md index 69f7f8c0e0..1b2f507db2 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -1,13 +1,22 @@ ### Compilation #### Step 1: install necessery tools + Install MySQL + centos7 : - yum install gfortran flex bison + yum install gfortran qt4 flex bison mysql-devel ubuntu16.04 : - sudo apt-get install gfortran flex bison + sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev + +If `libmysqlclient_r.so` does not exist after installing MySQL Development Files, you need to create a symbolic link: + +``` +sudo ln -s /path/to/libmysqlclient.so /path/to/libmysqlclient_r.so +``` #### Step 2: build(output to cmake_build folder) + cmake_build/src/milvus_server is the server cmake_build/src/libmilvus_engine.a is the static library @@ -44,6 +53,12 @@ If you encounter the following error when building: ### Launch server Set config in cpp/conf/server_config.yaml +Add milvus/bin/lib to LD_LIBRARY_PATH + +``` +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/path/to/milvus/bin/lib +``` + Then launch server with config: cd [build output path] start_server.sh diff --git a/cpp/cmake/DefineOptions.cmake b/cpp/cmake/DefineOptions.cmake index d95e7c7ed1..147663d0db 100644 --- a/cpp/cmake/DefineOptions.cmake +++ b/cpp/cmake/DefineOptions.cmake @@ -93,6 +93,8 @@ define_option(MILVUS_WITH_SQLITE "Build with SQLite library" ON) define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON) +define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON) + define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON) define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON) diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index cb5f3532fe..9aa3f62124 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -26,6 +26,7 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES JSONCONS LAPACK Lz4 + MySQLPP OpenBLAS Prometheus RocksDB @@ -56,12 +57,14 @@ macro(build_dependency DEPENDENCY_NAME) build_easyloggingpp() elseif("${DEPENDENCY_NAME}" STREQUAL "FAISS") build_faiss() + elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest") + build_gtest() elseif("${DEPENDENCY_NAME}" STREQUAL "LAPACK") build_lapack() elseif("${DEPENDENCY_NAME}" STREQUAL "Lz4") build_lz4() - elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest") - build_gtest() + elseif ("${DEPENDENCY_NAME}" STREQUAL "MySQLPP") + build_mysqlpp() elseif ("${DEPENDENCY_NAME}" STREQUAL "JSONCONS") build_jsoncons() elseif ("${DEPENDENCY_NAME}" STREQUAL "OpenBLAS") @@ -265,6 +268,12 @@ else() set(LZ4_SOURCE_URL "https://github.com/lz4/lz4/archive/${LZ4_VERSION}.tar.gz") endif() +if(DEFINED ENV{MILVUS_MYSQLPP_URL}) + set(MYSQLPP_SOURCE_URL "$ENV{MILVUS_MYSQLPP_URL}") +else() + set(MYSQLPP_SOURCE_URL "https://tangentsoft.com/mysqlpp/releases/mysql++-${MYSQLPP_VERSION}.tar.gz") +endif() + if (DEFINED ENV{MILVUS_OPENBLAS_URL}) set(OPENBLAS_SOURCE_URL "$ENV{MILVUS_OPENBLAS_URL}") else () @@ -829,8 +838,8 @@ macro(build_faiss) # ${MAKE} ${MAKE_BUILD_ARGS} BUILD_COMMAND ${MAKE} ${MAKE_BUILD_ARGS} all - COMMAND - cd gpu && make ${MAKE_BUILD_ARGS} + COMMAND + cd gpu && ${MAKE} ${MAKE_BUILD_ARGS} BUILD_IN_SOURCE 1 # INSTALL_DIR @@ -1068,6 +1077,65 @@ if(MILVUS_WITH_LZ4) include_directories(SYSTEM ${LZ4_INCLUDE_DIR}) endif() +# ---------------------------------------------------------------------- +# MySQL++ + +macro(build_mysqlpp) + message(STATUS "Building MySQL++-${MYSQLPP_VERSION} from source") + set(MYSQLPP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep") + set(MYSQLPP_INCLUDE_DIR "${MYSQLPP_PREFIX}/include") + set(MYSQLPP_SHARED_LIB + "${MYSQLPP_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}") + + set(MYSQLPP_CONFIGURE_ARGS + "--prefix=${MYSQLPP_PREFIX}" + "--enable-thread-check" + "CFLAGS=${EP_C_FLAGS}" + "CXXFLAGS=${EP_CXX_FLAGS}" + "LDFLAGS=-pthread") + + externalproject_add(mysqlpp_ep + URL + ${MYSQLPP_SOURCE_URL} +# GIT_REPOSITORY +# ${MYSQLPP_SOURCE_URL} +# GIT_TAG +# ${MYSQLPP_VERSION} +# GIT_SHALLOW +# TRUE + ${EP_LOG_OPTIONS} + CONFIGURE_COMMAND +# "./bootstrap" +# COMMAND + "./configure" + ${MYSQLPP_CONFIGURE_ARGS} + BUILD_COMMAND + ${MAKE} ${MAKE_BUILD_ARGS} + BUILD_IN_SOURCE + 1 + BUILD_BYPRODUCTS + ${MYSQLPP_SHARED_LIB}) + + file(MAKE_DIRECTORY "${MYSQLPP_INCLUDE_DIR}") + add_library(mysqlpp SHARED IMPORTED) + set_target_properties( + mysqlpp + PROPERTIES + IMPORTED_LOCATION "${MYSQLPP_SHARED_LIB}" + INTERFACE_INCLUDE_DIRECTORIES "${MYSQLPP_INCLUDE_DIR}") + + add_dependencies(mysqlpp mysqlpp_ep) + +endmacro() + +if(MILVUS_WITH_MYSQLPP) + + resolve_dependency(MySQLPP) + get_target_property(MYSQLPP_INCLUDE_DIR mysqlpp INTERFACE_INCLUDE_DIRECTORIES) + include_directories(SYSTEM "${MYSQLPP_INCLUDE_DIR}") + link_directories(SYSTEM ${MYSQLPP_PREFIX}/lib) +endif() + # ---------------------------------------------------------------------- # Prometheus diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 1a1c8303f2..c2ed775601 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -6,7 +6,12 @@ server_config: db_config: db_path: @MILVUS_DB_PATH@ # milvus data storage path - db_backend_url: http://127.0.0.1 # meta database uri + + # URI format: dialect://username:password@host:port/database + # All parts except dialect are optional, but you MUST include the delimiters + # Currently dialect supports mysql or sqlite + db_backend_url: sqlite://:@:/ + index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB archive_days_threshold: 30 # files older than x days will be archived, unit: day diff --git a/cpp/coverage.sh b/cpp/coverage.sh index 3415e752bd..cc509b611b 100755 --- a/cpp/coverage.sh +++ b/cpp/coverage.sh @@ -1,5 +1,7 @@ #!/bin/bash +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/milvus/lib + LCOV_CMD="lcov" LCOV_GEN_CMD="genhtml" @@ -20,15 +22,20 @@ if [ $? -ne 0 ]; then fi for test in `ls ${DIR_UNITTEST}`; do - echo $test - case ${test} in + case ${test} in + db_test) + # set run args for db_test + args="mysql://root:Fantast1c@192.168.1.194:3306/test" + ;; *_test) - # run unittest - ./${DIR_UNITTEST}/${test} - if [ $? -ne 0 ]; then - echo ${DIR_UNITTEST}/${test} "run failed" - fi + args="" + ;; esac + # run unittest + ./${DIR_UNITTEST}/${test} "${args}" + if [ $? -ne 0 ]; then + echo ${DIR_UNITTEST}/${test} "run failed" + fi done # gen test converage diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index e00420b2d1..d0029d5175 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -62,6 +62,7 @@ set(s3_client_files include_directories(/usr/include) include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include") include_directories(thrift/gen-cpp) +include_directories(/usr/include/mysql) set(third_party_libs easyloggingpp @@ -83,6 +84,7 @@ set(third_party_libs snappy zlib zstd + mysqlpp ${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so ) if (MEGASEARCH_WITH_ARROW STREQUAL "ON") @@ -181,4 +183,10 @@ endif () install(TARGETS milvus_server DESTINATION bin) -add_subdirectory(sdk) +install(FILES + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX} + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3 + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3.2.4 + DESTINATION lib) #need to copy libmysqlpp.so + +#add_subdirectory(sdk) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 1ea579ff71..0a1e8651e1 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -7,11 +7,13 @@ #include "DBMetaImpl.h" #include "Log.h" #include "EngineFactory.h" +#include "Factories.h" #include "metrics/Metrics.h" #include "scheduler/TaskScheduler.h" #include "scheduler/context/DeleteContext.h" #include "utils/TimeRecorder.h" +#include "MetaConsts.h" #include #include @@ -82,11 +84,14 @@ void CollectFileMetrics(int file_type, size_t file_size, double total_time) { DBImpl::DBImpl(const Options& options) : options_(options), shutting_down_(false), - meta_ptr_(new meta::DBMetaImpl(options_.meta)), - mem_mgr_(new MemManager(meta_ptr_, options_)), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { - StartTimerTasks(); + meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode); + mem_mgr_ = std::make_shared(meta_ptr_, options_); + // mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_)); + if (options.mode != Options::MODE::READ_ONLY) { + StartTimerTasks(); + } } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { @@ -153,10 +158,6 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { -#if 0 - return QuerySync(table_id, k, nq, vectors, dates, results); -#else - //get all table files from table meta::DatePartionedTableFilesSchema files; auto status = meta_ptr_->FilesToSearch(table_id, dates, files); @@ -170,7 +171,6 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, } return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results); -#endif } Status DBImpl::Query(const std::string& table_id, const std::vector& file_ids, @@ -198,141 +198,6 @@ Status DBImpl::Query(const std::string& table_id, const std::vector return QueryAsync(table_id, files_array, k, nq, vectors, dates, results); } -Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq, - const float* vectors, const meta::DatesT& dates, QueryResults& results) { - meta::DatePartionedTableFilesSchema files; - auto status = meta_ptr_->FilesToSearch(table_id, dates, files); - if (!status.ok()) { return status; } - - ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size(); - - meta::TableFilesSchema index_files; - meta::TableFilesSchema raw_files; - for (auto &day_files : files) { - for (auto &file : day_files.second) { - file.file_type_ == meta::TableFileSchema::INDEX ? - index_files.push_back(file) : raw_files.push_back(file); - } - } - - int dim = 0; - if (!index_files.empty()) { - dim = index_files[0].dimension_; - } else if (!raw_files.empty()) { - dim = raw_files[0].dimension_; - } else { - ENGINE_LOG_DEBUG << "no files to search"; - return Status::OK(); - } - - { - // [{ids, distence}, ...] - using SearchResult = std::pair, std::vector>; - std::vector batchresult(nq); // allocate nq cells. - - auto cluster = [&](long *nns, float *dis, const int& k) -> void { - for (int i = 0; i < nq; ++i) { - auto f_begin = batchresult[i].first.cbegin(); - auto s_begin = batchresult[i].second.cbegin(); - batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k); - batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k); - } - }; - - // Allocate Memory - float *output_distence; - long *output_ids; - output_distence = (float *) malloc(k * nq * sizeof(float)); - output_ids = (long *) malloc(k * nq * sizeof(long)); - memset(output_distence, 0, k * nq * sizeof(float)); - memset(output_ids, 0, k * nq * sizeof(long)); - - long search_set_size = 0; - - auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void { - for (auto &file : file_vec) { - - ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_); - index->Load(); - auto file_size = index->PhysicalSize(); - search_set_size += file_size; - - ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: " - << file_size/(1024*1024) << " M"; - - int inner_k = index->Count() < k ? index->Count() : k; - auto start_time = METRICS_NOW_TIME; - index->Search(nq, vectors, inner_k, output_distence, output_ids); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - CollectFileMetrics(file.file_type_, file_size, total_time); - cluster(output_ids, output_distence, inner_k); // cluster to each query - memset(output_distence, 0, k * nq * sizeof(float)); - memset(output_ids, 0, k * nq * sizeof(long)); - } - }; - - auto topk_cpu = [](const std::vector &input_data, - const int &k, - float *output_distence, - long *output_ids) -> void { - std::map> inverted_table; - for (int i = 0; i < input_data.size(); ++i) { - if (inverted_table.count(input_data[i]) == 1) { - auto& ori_vec = inverted_table[input_data[i]]; - ori_vec.push_back(i); - } - else { - inverted_table[input_data[i]] = std::vector{i}; - } - } - - int count = 0; - for (auto &item : inverted_table){ - if (count == k) break; - for (auto &id : item.second){ - output_distence[count] = item.first; - output_ids[count] = id; - if (++count == k) break; - } - } - }; - auto cluster_topk = [&]() -> void { - QueryResult res; - for (auto &result_pair : batchresult) { - auto &dis = result_pair.second; - auto &nns = result_pair.first; - - topk_cpu(dis, k, output_distence, output_ids); - - int inner_k = dis.size() < k ? dis.size() : k; - for (int i = 0; i < inner_k; ++i) { - res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping - } - results.push_back(res); // append to result list - res.clear(); - memset(output_distence, 0, k * nq * sizeof(float)); - memset(output_ids, 0, k * nq * sizeof(long)); - } - }; - - search_in_index(raw_files); - search_in_index(index_files); - - ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M"; - cluster_topk(); - - free(output_distence); - free(output_ids); - } - - if (results.empty()) { - return Status::NotFound("Group " + table_id + ", search result not found!"); - } - - return Status::OK(); -} - Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { @@ -406,9 +271,14 @@ void DBImpl::StartMetricTask() { } void DBImpl::StartCompactionTask() { +// static int count = 0; +// count++; +// std::cout << "StartCompactionTask: " << count << std::endl; +// std::cout << "c: " << count++ << std::endl; static uint64_t compact_clock_tick = 0; compact_clock_tick++; if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) { +// std::cout << "c r: " << count++ << std::endl; return; } @@ -515,6 +385,10 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { } void DBImpl::BackgroundCompaction(std::set table_ids) { +// static int b_count = 0; +// b_count++; +// std::cout << "BackgroundCompaction: " << b_count << std::endl; + Status status; for (auto& table_id : table_ids) { status = BackgroundMergeFiles(table_id); @@ -525,7 +399,13 @@ void DBImpl::BackgroundCompaction(std::set table_ids) { } meta_ptr_->Archive(); - meta_ptr_->CleanUpFilesWithTTL(1); + + int ttl = 1; + if (options_.mode == Options::MODE::CLUSTER) { + ttl = meta::D_SEC; +// ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds."; + } + meta_ptr_->CleanUpFilesWithTTL(ttl); } void DBImpl::StartBuildIndexTask() { diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index cc632847c1..9dcd174f8b 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -85,14 +85,6 @@ class DBImpl : public DB { ~DBImpl() override; private: - Status - QuerySync(const std::string &table_id, - uint64_t k, - uint64_t nq, - const float *vectors, - const meta::DatesT &dates, - QueryResults &results); - Status QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files, diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index d834d6cff2..8c56c863e7 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -183,6 +183,7 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, } Status DBMetaImpl::CreateTable(TableSchema &table_schema) { + try { MetricCollector metric; diff --git a/cpp/src/db/ExecutionEngine.cpp b/cpp/src/db/ExecutionEngine.cpp index f27d04dfa0..3412eb34bd 100644 --- a/cpp/src/db/ExecutionEngine.cpp +++ b/cpp/src/db/ExecutionEngine.cpp @@ -11,14 +11,9 @@ namespace zilliz { namespace milvus { namespace engine { -Status ExecutionEngine::AddWithIds(const std::vector& vectors, const std::vector& vector_ids) { - long n1 = (long)vectors.size(); - long n2 = (long)vector_ids.size(); - if (n1 != n2) { - LOG(ERROR) << "vectors size is not equal to the size of vector_ids: " << n1 << "!=" << n2; - return Status::Error("Error: AddWithIds"); - } - return AddWithIds(n1, vectors.data(), vector_ids.data()); +Status ExecutionEngine::AddWithIdArray(const std::vector& vectors, const std::vector& vector_ids) { + long n = (long)vector_ids.size(); + return AddWithIds(n, vectors.data(), vector_ids.data()); } diff --git a/cpp/src/db/ExecutionEngine.h b/cpp/src/db/ExecutionEngine.h index f26dce6371..d2b4d01e67 100644 --- a/cpp/src/db/ExecutionEngine.h +++ b/cpp/src/db/ExecutionEngine.h @@ -23,8 +23,7 @@ enum class EngineType { class ExecutionEngine { public: - virtual Status AddWithIds(const std::vector& vectors, - const std::vector& vector_ids); + virtual Status AddWithIdArray(const std::vector& vectors, const std::vector& vector_ids); virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0; diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index 99a2918b85..4b24bd3a1c 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -3,16 +3,18 @@ // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#include #include "Factories.h" #include "DBImpl.h" -#include #include #include #include #include #include #include +#include +#include "Exception.h" namespace zilliz { namespace milvus { @@ -26,6 +28,7 @@ DBMetaOptions DBMetaOptionsFactory::Build(const std::string& path) { ss << "/tmp/" << rand(); p = ss.str(); } + DBMetaOptions meta; meta.path = p; return meta; @@ -43,6 +46,48 @@ std::shared_ptr DBMetaImplFactory::Build() { return std::shared_ptr(new meta::DBMetaImpl(options)); } +std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOptions, + const int& mode) { + + std::string uri = metaOptions.backend_uri; + + std::string dialectRegex = "(.*)"; + std::string usernameRegex = "(.*)"; + std::string passwordRegex = "(.*)"; + std::string hostRegex = "(.*)"; + std::string portRegex = "(.*)"; + std::string dbNameRegex = "(.*)"; + std::string uriRegexStr = dialectRegex + "\\:\\/\\/" + + usernameRegex + "\\:" + + passwordRegex + "\\@" + + hostRegex + "\\:" + + portRegex + "\\/" + + dbNameRegex; + std::regex uriRegex(uriRegexStr); + std::smatch pieces_match; + + if (std::regex_match(uri, pieces_match, uriRegex)) { + std::string dialect = pieces_match[1].str(); + std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); + if (dialect.find("mysql") != std::string::npos) { + ENGINE_LOG_INFO << "Using MySQL"; + return std::make_shared(meta::MySQLMetaImpl(metaOptions, mode)); + } + else if (dialect.find("sqlite") != std::string::npos) { + ENGINE_LOG_INFO << "Using SQLite"; + return std::make_shared(meta::DBMetaImpl(metaOptions)); + } + else { + ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; + throw InvalidArgumentException("URI dialect is not mysql / sqlite"); + } + } + else { + ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri; + throw InvalidArgumentException("Wrong URI format "); + } +} + std::shared_ptr DBFactory::Build() { auto options = OptionsFactory::Build(); auto db = DBFactory::Build(options); diff --git a/cpp/src/db/Factories.h b/cpp/src/db/Factories.h index 46d3e1bbc0..889922b17a 100644 --- a/cpp/src/db/Factories.h +++ b/cpp/src/db/Factories.h @@ -7,6 +7,7 @@ #include "DB.h" #include "DBMetaImpl.h" +#include "MySQLMetaImpl.h" #include "Options.h" #include "ExecutionEngine.h" @@ -27,6 +28,7 @@ struct OptionsFactory { struct DBMetaImplFactory { static std::shared_ptr Build(); + static std::shared_ptr Build(const DBMetaOptions& metaOptions, const int& mode); }; struct DBFactory { diff --git a/cpp/src/db/MySQLConnectionPool.h b/cpp/src/db/MySQLConnectionPool.h new file mode 100644 index 0000000000..8992ba274c --- /dev/null +++ b/cpp/src/db/MySQLConnectionPool.h @@ -0,0 +1,109 @@ +#include "mysql++/mysql++.h" + +#include +#include +#include + +#include "Log.h" + +class MySQLConnectionPool : public mysqlpp::ConnectionPool { + +public: + // The object's only constructor + MySQLConnectionPool(std::string dbName, + std::string userName, + std::string passWord, + std::string serverIp, + int port = 0, + int maxPoolSize = 8) : + db_(dbName), + user_(userName), + password_(passWord), + server_(serverIp), + port_(port), + max_pool_size_(maxPoolSize) + { + + conns_in_use_ = 0; + + max_idle_time_ = 10; //10 seconds + } + + // The destructor. We _must_ call ConnectionPool::clear() here, + // because our superclass can't do it for us. + ~MySQLConnectionPool() override { + clear(); + } + + // Do a simple form of in-use connection limiting: wait to return + // a connection until there are a reasonably low number in use + // already. Can't do this in create() because we're interested in + // connections actually in use, not those created. Also note that + // we keep our own count; ConnectionPool::size() isn't the same! + mysqlpp::Connection* grab() override { + while (conns_in_use_ > max_pool_size_) { + sleep(1); + } + + ++conns_in_use_; + return mysqlpp::ConnectionPool::grab(); + } + + // Other half of in-use conn count limit + void release(const mysqlpp::Connection* pc) override { + mysqlpp::ConnectionPool::release(pc); + + if (conns_in_use_ <= 0) { + ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_ << std::endl; + } + else { + --conns_in_use_; + } + } + + int getConnectionsInUse() { + return conns_in_use_; + } + + void set_max_idle_time(int max_idle) { + max_idle_time_ = max_idle; + } + +protected: + + // Superclass overrides + mysqlpp::Connection* create() override { + // Create connection using the parameters we were passed upon + // creation. + mysqlpp::Connection* conn = new mysqlpp::Connection(); + conn->set_option(new mysqlpp::ReconnectOption(true)); + conn->connect(db_.empty() ? 0 : db_.c_str(), + server_.empty() ? 0 : server_.c_str(), + user_.empty() ? 0 : user_.c_str(), + password_.empty() ? 0 : password_.c_str(), + port_); + return conn; + } + + void destroy(mysqlpp::Connection* cp) override { + // Our superclass can't know how we created the Connection, so + // it delegates destruction to us, to be safe. + delete cp; + } + + unsigned int max_idle_time() override { + return max_idle_time_; + } + +private: + // Number of connections currently in use + std::atomic conns_in_use_; + + // Our connection parameters + std::string db_, user_, password_, server_; + int port_; + + int max_pool_size_; + + unsigned int max_idle_time_; +}; \ No newline at end of file diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp new file mode 100644 index 0000000000..0b0cc01e5d --- /dev/null +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -0,0 +1,1872 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "MySQLMetaImpl.h" +#include "IDGenerator.h" +#include "Utils.h" +#include "Log.h" +#include "MetaConsts.h" +#include "Factories.h" +#include "metrics/Metrics.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mysql++/mysql++.h" + +namespace zilliz { +namespace milvus { +namespace engine { +namespace meta { + + using namespace mysqlpp; + +// static std::unique_ptr connectionPtr(new Connection()); +// std::recursive_mutex mysql_mutex; +// +// std::unique_ptr& MySQLMetaImpl::getConnectionPtr() { +//// static std::recursive_mutex connectionMutex_; +// std::lock_guard lock(connectionMutex_); +// return connectionPtr; +// } + + namespace { + + Status HandleException(const std::string& desc, std::exception &e) { + ENGINE_LOG_ERROR << desc << ": " << e.what(); + return Status::DBTransactionError(desc, e.what()); + } + + class MetricCollector { + public: + MetricCollector() { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + start_time_ = METRICS_NOW_TIME; + } + + ~MetricCollector() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } + + private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + }; + + } + + std::string MySQLMetaImpl::GetTablePath(const std::string &table_id) { + return options_.path + "/tables/" + table_id; + } + + std::string MySQLMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) { + std::stringstream ss; + ss << GetTablePath(table_id) << "/" << date; + return ss.str(); + } + + void MySQLMetaImpl::GetTableFilePath(TableFileSchema &group_file) { + if (group_file.date_ == EmptyDate) { + group_file.date_ = Meta::GetDate(); + } + std::stringstream ss; + ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_) + << "/" << group_file.file_id_; + group_file.location_ = ss.str(); + } + + Status MySQLMetaImpl::NextTableId(std::string &table_id) { + std::stringstream ss; + SimpleIDGenerator g; + ss << g.GetNextIDNumber(); + table_id = ss.str(); + return Status::OK(); + } + + Status MySQLMetaImpl::NextFileId(std::string &file_id) { + std::stringstream ss; + SimpleIDGenerator g; + ss << g.GetNextIDNumber(); + file_id = ss.str(); + return Status::OK(); + } + + MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int& mode) + : options_(options_), + mode_(mode) { + Initialize(); + } + + Status MySQLMetaImpl::Initialize() { + +// std::lock_guard lock(mysql_mutex); + + if (!boost::filesystem::is_directory(options_.path)) { + auto ret = boost::filesystem::create_directory(options_.path); + if (!ret) { + ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path; + return Status::DBTransactionError("Failed to create db directory", options_.path); + } + } + + std::string uri = options_.backend_uri; + + std::string dialectRegex = "(.*)"; + std::string usernameRegex = "(.*)"; + std::string passwordRegex = "(.*)"; + std::string hostRegex = "(.*)"; + std::string portRegex = "(.*)"; + std::string dbNameRegex = "(.*)"; + std::string uriRegexStr = dialectRegex + "\\:\\/\\/" + + usernameRegex + "\\:" + + passwordRegex + "\\@" + + hostRegex + "\\:" + + portRegex + "\\/" + + dbNameRegex; + std::regex uriRegex(uriRegexStr); + std::smatch pieces_match; + + if (std::regex_match(uri, pieces_match, uriRegex)) { + std::string dialect = pieces_match[1].str(); + std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); + if (dialect.find("mysql") == std::string::npos) { + return Status::Error("URI's dialect is not MySQL"); + } + const char* username = pieces_match[2].str().c_str(); + const char* password = pieces_match[3].str().c_str(); + const char* serverAddress = pieces_match[4].str().c_str(); + unsigned int port = 0; + if (!pieces_match[5].str().empty()) { + port = std::stoi(pieces_match[5].str()); + } + const char* dbName = pieces_match[6].str().c_str(); + //std::cout << dbName << " " << serverAddress << " " << username << " " << password << " " << port << std::endl; +// connectionPtr->set_option(new MultiStatementsOption(true)); +// connectionPtr->set_option(new mysqlpp::ReconnectOption(true)); + int threadHint = std::thread::hardware_concurrency(); + int maxPoolSize = threadHint == 0 ? 8 : threadHint; + mysql_connection_pool_ = std::make_shared(dbName, username, password, serverAddress, port, maxPoolSize); +// std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl; + ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize); + try { + + CleanUp(); + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mysql_connection_pool_->getConnectionsInUse(); +// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) { +// return Status::Error("DB connection failed: ", connectionPtr->error()); +// } + if (!connectionPtr->thread_aware()) { + ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it."; + return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it."); + } + Query InitializeQuery = connectionPtr->query(); + +// InitializeQuery << "SET max_allowed_packet=67108864;"; +// if (!InitializeQuery.exec()) { +// return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); +// } + +// InitializeQuery << "DROP TABLE IF EXISTS Tables, TableFiles;"; + InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" << + "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << + "table_id VARCHAR(255) UNIQUE NOT NULL, " << + "state INT NOT NULL, " << + "dimension SMALLINT NOT NULL, " << + "created_on BIGINT NOT NULL, " << + "files_cnt BIGINT DEFAULT 0 NOT NULL, " << + "engine_type INT DEFAULT 1 NOT NULL, " << + "store_raw_data BOOL DEFAULT false NOT NULL);"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); + } + + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); + } + + InitializeQuery << "CREATE TABLE IF NOT EXISTS TableFiles (" << + "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << + "table_id VARCHAR(255) NOT NULL, " << + "engine_type INT DEFAULT 1 NOT NULL, " << + "file_id VARCHAR(255) NOT NULL, " << + "file_type INT DEFAULT 0 NOT NULL, " << + "size BIGINT DEFAULT 0 NOT NULL, " << + "updated_time BIGINT NOT NULL, " << + "created_on BIGINT NOT NULL, " << + "date INT DEFAULT -1 NOT NULL);"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); + } + + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); + } + } //Scoped Connection + +// //Consume all results to avoid "Commands out of sync" error +// while (InitializeQuery.more_results()) { +// InitializeQuery.store_next(); +// } + return Status::OK(); + +// if (InitializeQuery.exec()) { +// std::cout << "XXXXXXXXXXXXXXXXXXXXXXXXX" << std::endl; +// while (InitializeQuery.more_results()) { +// InitializeQuery.store_next(); +// } +// return Status::OK(); +// } else { +// return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); +// } + } catch (const ConnectionFailed& er) { + ENGINE_LOG_ERROR << "Failed to connect to database server" << ": " << er.what(); + return Status::DBTransactionError("Failed to connect to database server", er.what()); + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR DURING INITIALIZATION" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR DURING INITIALIZATION", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR DURING INITIALIZATION" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION", er.what()); + } catch (std::exception &e) { + return HandleException("Encounter exception during initialization", e); + } + } + else { + ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri; + return Status::Error("Wrong URI format"); + } + } + +// PXU TODO: Temp solution. Will fix later + Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, + const DatesT &dates) { + +// std::lock_guard lock(mysql_mutex); + + if (dates.empty()) { + return Status::OK(); + } + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + try { + + auto yesterday = GetDateWithDelta(-1); + + for (auto &date : dates) { + if (date >= yesterday) { + return Status::Error("Could not delete partitions within 2 days"); + } + } + + std::stringstream dateListSS; + for (auto &date : dates) { + dateListSS << std::to_string(date) << ", "; + } + std::string dateListStr = dateListSS.str(); + dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", " + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query dropPartitionsByDatesQuery = connectionPtr->query(); + + dropPartitionsByDatesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << " AND " << + "date in (" << dateListStr << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str(); + } + + if (!dropPartitionsByDatesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES"; + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", + dropPartitionsByDatesQuery.error()); + } + } //Scoped Connection + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) { + +// std::lock_guard lock(mysql_mutex); + +// server::Metrics::GetInstance().MetaAccessTotalIncrement(); + try { + + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query createTableQuery = connectionPtr->query(); +// ENGINE_LOG_DEBUG << "Create Table in"; + if (table_schema.table_id_.empty()) { + NextTableId(table_schema.table_id_); + } else { + createTableQuery << "SELECT state FROM Tables " << + "WHERE table_id = " << quote << table_schema.table_id_ << ";"; +// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str(); + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str(); + } + + StoreQueryResult res = createTableQuery.store(); + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + if (TableSchema::TO_DELETE == state) { + return Status::Error("Table already exists and it is in delete state, please wait a second"); + } + else { + return Status::OK();//table already exists, no error + } + } + } +// ENGINE_LOG_DEBUG << "Create Table start"; + + table_schema.files_cnt_ = 0; + table_schema.id_ = -1; + table_schema.created_on_ = utils::GetMicroSecTimeStamp(); + +// auto start_time = METRICS_NOW_TIME; + + std::string id = "NULL"; //auto-increment + std::string table_id = table_schema.table_id_; + std::string state = std::to_string(table_schema.state_); + std::string dimension = std::to_string(table_schema.dimension_); + std::string created_on = std::to_string(table_schema.created_on_); + std::string files_cnt = "0"; + std::string engine_type = std::to_string(table_schema.engine_type_); + std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false"; + + createTableQuery << "INSERT INTO Tables VALUES" << + "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " << + created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data << ");"; +// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str(); + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str(); + } + + if (SimpleResult res = createTableQuery.execute()) { + table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? +// std::cout << table_schema.id_ << std::endl; + //Consume all results to avoid "Commands out of sync" error +// while (createTableQuery.more_results()) { +// createTableQuery.store_next(); +// } + } else { + ENGINE_LOG_ERROR << "Add Table Error"; + return Status::DBTransactionError("Add Table Error", createTableQuery.error()); + } + } //Scoped Connection + +// auto end_time = METRICS_NOW_TIME; +// auto total_time = METRICS_MICROSECONDS(start_time, end_time); +// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + if (!boost::filesystem::is_directory(table_path)) { + auto ret = boost::filesystem::create_directories(table_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + return Status::Error("Failed to create table path"); + } + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what()); + } catch (std::exception &e) { + return HandleException("Encounter exception when create table", e); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DeleteTable(const std::string& table_id) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + //soft delete table + Query deleteTableQuery = connectionPtr->query(); +// + deleteTableQuery << "UPDATE Tables " << + "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str(); + } + + if (!deleteTableQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE"; + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error()); + } + + } //Scoped Connection + + + if (mode_ != Options::MODE::SINGLE) { + DeleteTableFiles(table_id); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DeleteTableFiles(const std::string& table_id) { + try { + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + //soft delete table files + Query deleteTableFilesQuery = connectionPtr->query(); + // + deleteTableFilesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << + "WHERE table_id = " << quote << table_id << " AND " << + "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str(); + } + + if (!deleteTableFilesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error()); + } + } //Scoped Connection + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE FILES", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query describeTableQuery = connectionPtr->query(); + describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM Tables " << + "WHERE table_id = " << quote << table_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str(); + } + + res = describeTableQuery.store(); + } //Scoped Connection + + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + const Row& resRow = res[0]; + + table_schema.id_ = resRow["id"]; //implicit conversion + + table_schema.dimension_ = resRow["dimension"]; + + table_schema.files_cnt_ = resRow["files_cnt"]; + + table_schema.engine_type_ = resRow["engine_type"]; + + int store_raw_data = resRow["store_raw_data"]; + table_schema.store_raw_data_ = (store_raw_data == 1); + } + else { + return Status::NotFound("Table " + table_schema.table_id_ + " not found"); + } + + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING TABLE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query hasTableQuery = connectionPtr->query(); + //since table_id is a unique column we just need to check whether it exists or not + hasTableQuery << "SELECT EXISTS " << + "(SELECT 1 FROM Tables " << + "WHERE table_id = " << quote << table_id << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str(); + } + + res = hasTableQuery.store(); + } //Scoped Connection + + assert(res && res.num_rows() == 1); + int check = res[0]["check"]; + has_or_not = (check == 1); + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF TABLE EXISTS", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::AllTables(std::vector& table_schema_array) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query allTablesQuery = connectionPtr->query(); + allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM Tables " << + "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str(); + } + + res = allTablesQuery.store(); + } //Scoped Connection + + for (auto& resRow : res) { + TableSchema table_schema; + + table_schema.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_schema.table_id_ = table_id; + + table_schema.dimension_ = resRow["dimension"]; + + table_schema.files_cnt_ = resRow["files_cnt"]; + + table_schema.engine_type_ = resRow["engine_type"]; + + int store_raw_data = resRow["store_raw_data"]; + table_schema.store_raw_data_ = (store_raw_data == 1); + + table_schema_array.emplace_back(table_schema); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING ALL TABLES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING ALL TABLES", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { + +// std::lock_guard lock(mysql_mutex); + + if (file_schema.date_ == EmptyDate) { + file_schema.date_ = Meta::GetDate(); + } + TableSchema table_schema; + table_schema.table_id_ = file_schema.table_id_; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + try { + + MetricCollector metric; + + NextFileId(file_schema.file_id_); + file_schema.file_type_ = TableFileSchema::NEW; + file_schema.dimension_ = table_schema.dimension_; + file_schema.size_ = 0; + file_schema.created_on_ = utils::GetMicroSecTimeStamp(); + file_schema.updated_time_ = file_schema.created_on_; + file_schema.engine_type_ = table_schema.engine_type_; + GetTableFilePath(file_schema); + + std::string id = "NULL"; //auto-increment + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query createTableFileQuery = connectionPtr->query(); + + createTableFileQuery << "INSERT INTO TableFiles VALUES" << + "(" << id << ", " << quote << table_id << ", " << engine_type << ", " << + quote << file_id << ", " << file_type << ", " << size << ", " << + updated_time << ", " << created_on << ", " << date << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str(); + } + + if (SimpleResult res = createTableFileQuery.execute()) { + file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? + + //Consume all results to avoid "Commands out of sync" error +// while (createTableFileQuery.more_results()) { +// createTableFileQuery.store_next(); +// } + } else { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE"; + return Status::DBTransactionError("Add file Error", createTableFileQuery.error()); + } + } // Scoped Connection + + auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); + + if (!boost::filesystem::is_directory(partition_path)) { + auto ret = boost::filesystem::create_directory(partition_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; + return Status::DBTransactionError("Failed to create partition directory"); + } + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what()); + } catch (std::exception& ex) { + return HandleException("Encounter exception when create table file", ex); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + files.clear(); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query filesToIndexQuery = connectionPtr->query(); + filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str(); + } + + res = filesToIndexQuery.store(); + } //Scoped Connection + + std::map groups; + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + auto groupItr = groups.find(table_file.table_id_); + if (groupItr == groups.end()) { + TableSchema table_schema; + table_schema.table_id_ = table_file.table_id_; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + groups[table_file.table_id_] = table_schema; +// std::cout << table_schema.dimension_ << std::endl; + } + table_file.dimension_ = groups[table_file.table_id_].dimension_; + + GetTableFilePath(table_file); + + files.push_back(table_file); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO INDEX", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, + const DatesT &partition, + DatePartionedTableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + files.clear(); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + if (partition.empty()) { + + Query filesToSearchQuery = connectionPtr->query(); + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str(); + } + + res = filesToSearchQuery.store(); + + } else { + + Query filesToSearchQuery = connectionPtr->query(); + + std::stringstream partitionListSS; + for (auto &date : partition) { + partitionListSS << std::to_string(date) << ", "; + } + std::string partitionListStr = partitionListSS.str(); + partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " + + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "date IN (" << partitionListStr << ") AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str(); + } + + res = filesToSearchQuery.store(); + + } + } //Scoped Connection + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + GetTableFilePath(table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, + DatePartionedTableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + files.clear(); + + try { + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query filesToMergeQuery = connectionPtr->query(); + filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "file_type = " << std::to_string(TableFileSchema::RAW) << " " << + "ORDER BY size DESC" << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str(); + } + + res = filesToMergeQuery.store(); + } //Scoped Connection + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + GetTableFilePath(table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO MERGE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::GetTableFiles(const std::string& table_id, + const std::vector& ids, + TableFilesSchema& table_files) { + +// std::lock_guard lock(mysql_mutex); + + if (ids.empty()) { + return Status::OK(); + } + + std::stringstream idSS; + for (auto& id : ids) { + idSS << "id = " << std::to_string(id) << " OR "; + } + std::string idStr = idSS.str(); + idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR " + + try { + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query getTableFileQuery = connectionPtr->query(); + getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(" << idStr << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str(); + } + + res = getTableFileQuery.store(); + } //Scoped Connection + + assert(res); + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + for (auto& resRow : res) { + + TableFileSchema file_schema; + + file_schema.table_id_ = table_id; + + file_schema.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + file_schema.file_id_ = file_id; + + file_schema.file_type_ = resRow["file_type"]; + + file_schema.size_ = resRow["size"]; + + file_schema.date_ = resRow["date"]; + + file_schema.dimension_ = table_schema.dimension_; + + GetTableFilePath(file_schema); + + table_files.emplace_back(file_schema); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING TABLE FILES", er.what()); + } + + return Status::OK(); + } + +// PXU TODO: Support Swap + Status MySQLMetaImpl::Archive() { + +// std::lock_guard lock(mysql_mutex); + + auto &criterias = options_.archive_conf.GetCriterias(); + if (criterias.empty()) { + return Status::OK(); + } + + for (auto& kv : criterias) { + auto &criteria = kv.first; + auto &limit = kv.second; + if (criteria == "days") { + size_t usecs = limit * D_SEC * US_PS; + long now = utils::GetMicroSecTimeStamp(); + + try { + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query archiveQuery = connectionPtr->query(); + archiveQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE created_on < " << std::to_string(now - usecs) << " AND " << + "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str(); + } + + if (!archiveQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR DURING ARCHIVE", archiveQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DURING ARCHIVE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DURING ARCHIVE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DURING ARCHIVE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DURING ARCHIVE", er.what()); + } + } + if (criteria == "disk") { + uint64_t sum = 0; + Size(sum); + + auto to_delete = (sum - limit * G); + DiscardFiles(to_delete); + } + } + + return Status::OK(); + } + + Status MySQLMetaImpl::Size(uint64_t &result) { + +// std::lock_guard lock(mysql_mutex); + + result = 0; + try { + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query getSizeQuery = connectionPtr->query(); + getSizeQuery << "SELECT IFNULL(SUM(size),0) AS sum " << + "FROM TableFiles " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str(); + } + + res = getSizeQuery.store(); + } //Scoped Connection + + assert(res && res.num_rows() == 1); +// if (!res) { +//// std::cout << "result is NULL" << std::endl; +// return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", getSizeQuery.error()); +// } + if (res.empty()) { + result = 0; +// std::cout << "result = 0" << std::endl; + } + else { + result = res[0]["sum"]; +// std::cout << "result = " << std::to_string(result) << std::endl; + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING SIZE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING SIZE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING SIZE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) { + +// std::lock_guard lock(mysql_mutex); + + if (to_discard_size <= 0) { +// std::cout << "in" << std::endl; + return Status::OK(); + } + ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; + + try { + + MetricCollector metric; + + bool status; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query discardFilesQuery = connectionPtr->query(); + discardFilesQuery << "SELECT id, size " << + "FROM TableFiles " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "ORDER BY id ASC " << + "LIMIT 10;"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str(); + } + + // std::cout << discardFilesQuery.str() << std::endl; + StoreQueryResult res = discardFilesQuery.store(); + + assert(res); + if (res.num_rows() == 0) { + return Status::OK(); + } + + TableFileSchema table_file; + std::stringstream idsToDiscardSS; + for (auto &resRow : res) { + if (to_discard_size <= 0) { + break; + } + table_file.id_ = resRow["id"]; + table_file.size_ = resRow["size"]; + idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR "; + ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ + << " table_file.size=" << table_file.size_; + to_discard_size -= table_file.size_; + } + + std::string idsToDiscardStr = idsToDiscardSS.str(); + idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR " + + discardFilesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << + "WHERE " << idsToDiscardStr << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str(); + } + + status = discardFilesQuery.exec(); + if (!status) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error()); + } + } //Scoped Connection + + return DiscardFiles(to_discard_size); + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DISCARDING FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DISCARDING FILES", er.what()); + } + } + + //ZR: this function assumes all fields in file_schema have value + Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { + +// std::lock_guard lock(mysql_mutex); + + file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); + try { + + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query updateTableFileQuery = connectionPtr->query(); + + //if the table has been deleted, just mark the table file as TO_DELETE + //clean thread will delete the file later + updateTableFileQuery << "SELECT state FROM Tables " << + "WHERE table_id = " << quote << file_schema.table_id_ << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str(); + } + + StoreQueryResult res = updateTableFileQuery.store(); + + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + if (state == TableSchema::TO_DELETE) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + } else { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + updateTableFileQuery << "UPDATE TableFiles " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str(); + } + + // std::cout << updateTableFileQuery.str() << std::endl; + + if (!updateTableFileQuery.exec()) { + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE"; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", + updateTableFileQuery.error()); + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILE", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + try { + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query updateTableFilesQuery = connectionPtr->query(); + + std::map has_tables; + for (auto &file_schema : files) { + + if (has_tables.find(file_schema.table_id_) != has_tables.end()) { + continue; + } + + updateTableFilesQuery << "SELECT EXISTS " << + "(SELECT 1 FROM Tables " << + "WHERE table_id = " << quote << file_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str(); + } + + StoreQueryResult res = updateTableFilesQuery.store(); + + assert(res && res.num_rows() == 1); + int check = res[0]["check"]; + has_tables[file_schema.table_id_] = (check == 1); + } + + for (auto &file_schema : files) { + + if (!has_tables[file_schema.table_id_]) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); + + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + updateTableFilesQuery << "UPDATE TableFiles " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str(); + } + + if (!updateTableFilesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", + updateTableFilesQuery.error()); + } + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILES", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { +// static int b_count = 0; +// b_count++; +// std::cout << "CleanUpFilesWithTTL: " << b_count << std::endl; +// std::lock_guard lock(mysql_mutex); + + auto now = utils::GetMicroSecTimeStamp(); + try { + MetricCollector metric; + + { + +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use before creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use after creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " << + "FROM TableFiles " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " << + "updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + + assert(res); + + TableFileSchema table_file; + std::vector idsToDelete; + + for (auto &resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.date_ = resRow["date"]; + + GetTableFilePath(table_file); + + ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " + << table_file.location_ << std::endl; + boost::filesystem::remove(table_file.location_); + + idsToDelete.emplace_back(std::to_string(table_file.id_)); + } + + if (!idsToDelete.empty()) { + + std::stringstream idsToDeleteSS; + for (auto &id : idsToDelete) { + idsToDeleteSS << "id = " << id << " OR "; + } + + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " << + idsToDeleteStr << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + if (!cleanUpFilesWithTTLQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; + return Status::DBTransactionError("CleanUpFilesWithTTL Error", + cleanUpFilesWithTTLQuery.error()); + } + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } + + try { + MetricCollector metric; + + { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use before creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use after creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id " << + "FROM Tables " << + "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + assert(res); +// std::cout << res.num_rows() << std::endl; + + if (!res.empty()) { + + std::stringstream idsToDeleteSS; + for (auto &resRow : res) { + size_t id = resRow["id"]; + std::string table_id; + resRow["table_id"].to_string(table_id); + + auto table_path = GetTablePath(table_id); + + ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; + boost::filesystem::remove_all(table_path); + + idsToDeleteSS << "id = " << std::to_string(id) << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " << + idsToDeleteStr << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + if (!cleanUpFilesWithTTLQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", + cleanUpFilesWithTTLQuery.error()); + } + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::CleanUp() { + +// std::lock_guard lock(mysql_mutex); + + try { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + ENGINE_LOG_DEBUG << "Remove table file type as NEW"; + Query cleanUpQuery = connectionPtr->query(); + cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str(); + } + + if (!cleanUpQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES"; + return Status::DBTransactionError("Clean up Error", cleanUpQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) { + +// std::lock_guard lock(mysql_mutex); + + try { + MetricCollector metric; + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + + if (!status.ok()) { + return status; + } + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query countQuery = connectionPtr->query(); + countQuery << "SELECT size " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str(); + } + + res = countQuery.store(); + } //Scoped Connection + + result = 0; + for (auto &resRow : res) { + size_t size = resRow["size"]; + result += size; + } + + assert(table_schema.dimension_ != 0); + result /= table_schema.dimension_; + result /= sizeof(float); + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING COUNT" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING COUNT", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING COUNT" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING COUNT", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::DropAll() { + +// std::lock_guard lock(mysql_mutex); + + if (boost::filesystem::is_directory(options_.path)) { + boost::filesystem::remove_all(options_.path); + } + try { + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query dropTableQuery = connectionPtr->query(); + dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str(); + } + + if (dropTableQuery.exec()) { + return Status::OK(); + } + else { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE"; + return Status::DBTransactionError("DROP TABLE ERROR", dropTableQuery.error()); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what()); + } + return Status::OK(); + } + + MySQLMetaImpl::~MySQLMetaImpl() { +// std::lock_guard lock(mysql_mutex); + CleanUp(); + } + +} // namespace meta +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/MySQLMetaImpl.h b/cpp/src/db/MySQLMetaImpl.h new file mode 100644 index 0000000000..9ff8254b60 --- /dev/null +++ b/cpp/src/db/MySQLMetaImpl.h @@ -0,0 +1,91 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Meta.h" +#include "Options.h" +#include "MySQLConnectionPool.h" + +#include "mysql++/mysql++.h" +#include + +namespace zilliz { +namespace milvus { +namespace engine { +namespace meta { + +// auto StoragePrototype(const std::string& path); + using namespace mysqlpp; + + class MySQLMetaImpl : public Meta { + public: + MySQLMetaImpl(const DBMetaOptions& options_, const int& mode); + + virtual Status CreateTable(TableSchema& table_schema) override; + virtual Status DescribeTable(TableSchema& group_info_) override; + virtual Status HasTable(const std::string& table_id, bool& has_or_not) override; + virtual Status AllTables(std::vector& table_schema_array) override; + + virtual Status DeleteTable(const std::string& table_id) override; + virtual Status DeleteTableFiles(const std::string& table_id) override; + + virtual Status CreateTableFile(TableFileSchema& file_schema) override; + virtual Status DropPartitionsByDates(const std::string& table_id, + const DatesT& dates) override; + + virtual Status GetTableFiles(const std::string& table_id, + const std::vector& ids, + TableFilesSchema& table_files) override; + + virtual Status UpdateTableFile(TableFileSchema& file_schema) override; + + virtual Status UpdateTableFiles(TableFilesSchema& files) override; + + virtual Status FilesToSearch(const std::string& table_id, + const DatesT& partition, + DatePartionedTableFilesSchema& files) override; + + virtual Status FilesToMerge(const std::string& table_id, + DatePartionedTableFilesSchema& files) override; + + virtual Status FilesToIndex(TableFilesSchema&) override; + + virtual Status Archive() override; + + virtual Status Size(uint64_t& result) override; + + virtual Status CleanUp() override; + + virtual Status CleanUpFilesWithTTL(uint16_t seconds) override; + + virtual Status DropAll() override; + + virtual Status Count(const std::string& table_id, uint64_t& result) override; + + virtual ~MySQLMetaImpl(); + + private: + Status NextFileId(std::string& file_id); + Status NextTableId(std::string& table_id); + Status DiscardFiles(long long to_discard_size); + std::string GetTablePath(const std::string& table_id); + std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date); + void GetTableFilePath(TableFileSchema& group_file); + Status Initialize(); + + const DBMetaOptions options_; + const int mode_; + + std::shared_ptr mysql_connection_pool_; + bool safe_grab = false; + +// std::mutex connectionMutex_; + }; // DBMetaImpl + +} // namespace meta +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 919d21709c..609e3ca245 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -45,15 +45,23 @@ struct DBMetaOptions { std::string path; std::string backend_uri; ArchiveConf archive_conf = ArchiveConf("delete"); + bool sql_echo = false; }; // DBMetaOptions - struct Options { + + typedef enum { + SINGLE, + CLUSTER, + READ_ONLY + } MODE; + Options(); uint16_t memory_sync_interval = 1; //unit: second uint16_t merge_trigger_number = 2; size_t index_trigger_size = ONE_GB; //unit: byte DBMetaOptions meta; + int mode = MODE::SINGLE; }; // Options diff --git a/cpp/src/db/scheduler/TaskDispatchQueue.cpp b/cpp/src/db/scheduler/TaskDispatchQueue.cpp index 2ce0e933b4..b728e925a9 100644 --- a/cpp/src/db/scheduler/TaskDispatchQueue.cpp +++ b/cpp/src/db/scheduler/TaskDispatchQueue.cpp @@ -24,14 +24,6 @@ TaskDispatchQueue::Put(const ScheduleContextPtr &context) { return; } - if (queue_.size() >= capacity_) { - std::string error_msg = - "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + - std::to_string(queue_.size()); - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - TaskDispatchStrategy::Schedule(context, queue_); empty_.notify_all(); @@ -42,12 +34,6 @@ TaskDispatchQueue::Take() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); - if (queue_.empty()) { - std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - ScheduleTaskPtr front(queue_.front()); queue_.pop_front(); full_.notify_all(); diff --git a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp index 7200f2584f..985f86cb09 100644 --- a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp +++ b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp @@ -74,20 +74,26 @@ public: } std::string table_id = context->table_id(); - for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) { + + //put delete task to proper position + //for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1 + //if user want to delete table1, the DeleteTask will be insert into No.6 position + for(std::list::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) { if((*iter)->type() != ScheduleTaskType::kIndexLoad) { continue; } - //put delete task to proper position IndexLoadTaskPtr loader = std::static_pointer_cast(*iter); - if(loader->file_->table_id_ == table_id) { - - task_list.insert(++iter, delete_task); - break; + if(loader->file_->table_id_ != table_id) { + continue; } + + task_list.insert(iter.base(), delete_task); + return true; } + //no task is searching this table, put DeleteTask to front of list so that the table will be delete asap + task_list.push_front(delete_task); return true; } }; diff --git a/cpp/src/sdk/CMakeLists.txt b/cpp/src/sdk/CMakeLists.txt index a43f0b85de..b51c2d5e09 100644 --- a/cpp/src/sdk/CMakeLists.txt +++ b/cpp/src/sdk/CMakeLists.txt @@ -32,4 +32,4 @@ target_link_libraries(milvus_sdk add_subdirectory(examples) -install(TARGETS milvus_sdk DESTINATION bin) +install(TARGETS milvus_sdk DESTINATION lib) diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index 7892a57f2b..bf859b3b4f 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -23,6 +23,33 @@ DBWrapper::DBWrapper() { if(index_size > 0) {//ensure larger than zero, unit is MB opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; } + std::string sql_echo = config.GetValue(CONFIG_DB_SQL_ECHO, "off"); + if (sql_echo == "on") { + opt.meta.sql_echo = true; + } + else if (sql_echo == "off") { + opt.meta.sql_echo = false; + } + else { + std::cout << "ERROR: sql_echo specified in db_config is not one of ['on', 'off']" << std::endl; + kill(0, SIGUSR1); + } + + ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER); + std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single"); + if (mode == "single") { + opt.mode = zilliz::milvus::engine::Options::MODE::SINGLE; + } + else if (mode == "cluster") { + opt.mode = zilliz::milvus::engine::Options::MODE::CLUSTER; + } + else if (mode == "read_only") { + opt.mode = zilliz::milvus::engine::Options::MODE::READ_ONLY; + } + else { + std::cout << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" << std::endl; + kill(0, SIGUSR1); + } //set archive config engine::ArchiveConf::CriteriaT criterial; @@ -43,9 +70,15 @@ DBWrapper::DBWrapper() { kill(0, SIGUSR1); } - zilliz::milvus::engine::DB::Open(opt, &db_); + std::string msg = opt.meta.path; + try { + zilliz::milvus::engine::DB::Open(opt, &db_); + } catch(std::exception& ex) { + msg = ex.what(); + } + if(db_ == nullptr) { - std::cout << "ERROR! Failed to open database" << std::endl; + std::cout << "ERROR! Failed to open database: " << msg << std::endl; kill(0, SIGUSR1); } } diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index abf898bacf..768430f023 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -19,6 +19,7 @@ static const std::string CONFIG_SERVER_ADDRESS = "address"; static const std::string CONFIG_SERVER_PORT = "port"; static const std::string CONFIG_SERVER_PROTOCOL = "transfer_protocol"; static const std::string CONFIG_SERVER_MODE = "server_mode"; +static const std::string CONFIG_CLUSTER_MODE = "mode"; static const std::string CONFIG_DB = "db_config"; static const std::string CONFIG_DB_URL = "db_backend_url"; @@ -26,6 +27,7 @@ static const std::string CONFIG_DB_PATH = "db_path"; static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold"; static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold"; static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold"; +static const std::string CONFIG_DB_SQL_ECHO = "sql_echo"; static const std::string CONFIG_LOG = "log_config"; diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 269d81a498..311760948d 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -7,6 +7,7 @@ GTEST_VERSION=1.8.1 JSONCONS_VERSION=0.126.0 LAPACK_VERSION=v3.8.0 LZ4_VERSION=v1.9.1 +MYSQLPP_VERSION=3.2.4 OPENBLAS_VERSION=v0.3.6 PROMETHEUS_VERSION=v0.7.0 ROCKSDB_VERSION=v6.0.2 diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index b868d0f1f1..38046617ae 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -5,15 +5,11 @@ #------------------------------------------------------------------------------- link_directories( "${CMAKE_BINARY_DIR}/lib" - "${GTEST_PREFIX}/lib/" - ) aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) -message(STATUS "GTEST LIB: ${GTEST_PREFIX}/lib") - set(unittest_srcs ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp) #${EASYLOGGINGPP_INCLUDE_DIR}/easylogging++.cc) diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index c9b7ea7dcc..5bae9190f5 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -7,6 +7,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(./ test_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) @@ -20,19 +21,17 @@ set(db_scheduler_srcs include_directories(/usr/local/cuda/include) link_directories("/usr/local/cuda/lib64") - +include_directories(/usr/include/mysql) set(db_test_src - ${unittest_srcs} + #${unittest_srcs} ${config_files} ${cache_srcs} ${db_srcs} ${db_scheduler_srcs} ${wrapper_src} ${require_files} - utils.cpp - db_tests.cpp - meta_tests.cpp) + ${test_srcs}) cuda_add_executable(db_test ${db_test_src}) @@ -45,8 +44,9 @@ set(db_libs boost_system boost_filesystem lz4 + mysqlpp ) target_link_libraries(db_test ${db_libs} ${unittest_libs}) -install(TARGETS db_test DESTINATION bin) \ No newline at end of file +install(TARGETS db_test DESTINATION bin) diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index aa311550ee..d505320e86 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -12,31 +12,34 @@ #include "db/DB.h" #include "db/DBImpl.h" #include "db/MetaConsts.h" +#include "db/Factories.h" using namespace zilliz::milvus; namespace { -static const std::string TABLE_NAME = "test_group"; -static constexpr int64_t TABLE_DIM = 256; + static const std::string TABLE_NAME = "test_group"; + static constexpr int64_t TABLE_DIM = 256; + static constexpr int64_t VECTOR_COUNT = 250000; + static constexpr int64_t INSERT_LOOP = 100000; -engine::meta::TableSchema BuildTableSchema() { - engine::meta::TableSchema table_info; - table_info.dimension_ = TABLE_DIM; - table_info.table_id_ = TABLE_NAME; - table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; - return table_info; -} - -void BuildVectors(int64_t n, std::vector& vectors) { - vectors.clear(); - vectors.resize(n*TABLE_DIM); - float* data = vectors.data(); - for(int i = 0; i < n; i++) { - for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); - data[TABLE_DIM * i] += i / 2000.; + engine::meta::TableSchema BuildTableSchema() { + engine::meta::TableSchema table_info; + table_info.dimension_ = TABLE_DIM; + table_info.table_id_ = TABLE_NAME; + table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; + return table_info; + } + + void BuildVectors(int64_t n, std::vector& vectors) { + vectors.clear(); + vectors.resize(n*TABLE_DIM); + float* data = vectors.data(); + for(int i = 0; i < n; i++) { + for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; + } } -} } @@ -88,20 +91,14 @@ TEST_F(DBTest, CONFIG_TEST) { TEST_F(DBTest, DB_TEST) { - static const std::string table_name = "test_group"; - static const int table_dim = 256; - - engine::meta::TableSchema table_info; - table_info.dimension_ = table_dim; - table_info.table_id_ = table_name; - table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; + engine::meta::TableSchema table_info = BuildTableSchema(); engine::Status stat = db_->CreateTable(table_info); engine::meta::TableSchema table_info_get; - table_info_get.table_id_ = table_name; + table_info_get.table_id_ = TABLE_NAME; stat = db_->DescribeTable(table_info_get); ASSERT_STATS(stat); - ASSERT_EQ(table_info_get.dimension_, table_dim); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); engine::IDNumbers vector_ids; engine::IDNumbers target_ids; @@ -130,7 +127,7 @@ TEST_F(DBTest, DB_TEST) { prev_count = count; START_TIMER; - stat = db_->Query(table_name, k, qb, qxb.data(), results); + stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; STOP_TIMER(ss.str()); @@ -149,14 +146,14 @@ TEST_F(DBTest, DB_TEST) { } }); - int loop = 100000; + int loop = INSERT_LOOP; for (auto i=0; iInsertVectors(table_name, qb, qxb.data(), target_ids); + db_->InsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); ASSERT_EQ(target_ids.size(), qb); } else { - db_->InsertVectors(table_name, nb, xb.data(), vector_ids); + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); } std::this_thread::sleep_for(std::chrono::microseconds(1)); } @@ -175,7 +172,7 @@ TEST_F(DBTest, SEARCH_TEST) { ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); // prepare raw data - size_t nb = 250000; + size_t nb = VECTOR_COUNT; size_t nq = 10; size_t k = 5; std::vector xb(nb*TABLE_DIM); @@ -223,6 +220,18 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { engine::meta::TableSchema table_info = BuildTableSchema(); engine::Status stat = db_->CreateTable(table_info); + std::vector table_schema_array; + stat = db_->AllTables(table_schema_array); + ASSERT_STATS(stat); + bool bfound = false; + for(auto& schema : table_schema_array) { + if(schema.table_id_ == TABLE_NAME) { + bfound = true; + break; + } + } + ASSERT_TRUE(bfound); + engine::meta::TableSchema table_info_get; table_info_get.table_id_ = TABLE_NAME; stat = db_->DescribeTable(table_info_get); @@ -239,7 +248,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { std::vector xb; BuildVectors(nb, xb); - int loop = 100000; + int loop = INSERT_LOOP; for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); std::this_thread::sleep_for(std::chrono::microseconds(1)); @@ -270,7 +279,7 @@ TEST_F(DBTest2, DELETE_TEST) { uint64_t size; db_->Size(size); - int64_t nb = 100000; + int64_t nb = INSERT_LOOP; std::vector xb; BuildVectors(nb, xb); diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index 9baef712ab..0f3a92af09 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -39,6 +39,10 @@ TEST_F(MetaTest, TABLE_TEST) { table.table_id_ = table_id; status = impl_->CreateTable(table); ASSERT_TRUE(status.ok()); + + table.table_id_ = ""; + status = impl_->CreateTable(table); + ASSERT_TRUE(status.ok()); } TEST_F(MetaTest, TABLE_FILE_TEST) { @@ -46,6 +50,7 @@ TEST_F(MetaTest, TABLE_FILE_TEST) { meta::TableSchema table; table.table_id_ = table_id; + table.dimension_ = 256; auto status = impl_->CreateTable(table); meta::TableFileSchema table_file; @@ -54,6 +59,11 @@ TEST_F(MetaTest, TABLE_FILE_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); + uint64_t cnt = 0; + status = impl_->Count(table_id, cnt); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(cnt, 0UL); + auto file_id = table_file.file_id_; auto new_file_type = meta::TableFileSchema::INDEX; @@ -254,4 +264,9 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(dated_files[table_file.date_].size(), to_index_files_cnt+raw_files_cnt+index_files_cnt); + + status = impl_->FilesToSearch(table_id, meta::DatesT(), dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(), + to_index_files_cnt+raw_files_cnt+index_files_cnt); } diff --git a/cpp/unittest/db/misc_test.cpp b/cpp/unittest/db/misc_test.cpp new file mode 100644 index 0000000000..4356746fc2 --- /dev/null +++ b/cpp/unittest/db/misc_test.cpp @@ -0,0 +1,137 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include + +#include "db/FaissExecutionEngine.h" +#include "db/Exception.h" +#include "db/Status.h" +#include "db/Options.h" +#include "db/DBMetaImpl.h" +#include "db/EngineFactory.h" + +#include + +using namespace zilliz::milvus; + +namespace { + void CopyStatus(engine::Status& st1, engine::Status& st2) { + st1 = st2; + } + +} + +TEST(DBMiscTest, ENGINE_API_TEST) { + //engine api AddWithIdArray + const uint16_t dim = 512; + const long n = 10; + engine::FaissExecutionEngine engine(512, "/tmp/1", "IDMap", "IDMap,Flat"); + std::vector vectors; + std::vector ids; + for (long i = 0; i < n; i++) { + for (uint16_t k = 0; k < dim; k++) { + vectors.push_back((float) k); + } + ids.push_back(i); + } + + auto status = engine.AddWithIdArray(vectors, ids); + ASSERT_TRUE(status.ok()); + + auto engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::INVALID); + ASSERT_EQ(engine_ptr, nullptr); + + engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IVFFLAT); + ASSERT_NE(engine_ptr, nullptr); + + engine_ptr = engine::EngineFactory::Build(128, "/tmp", engine::EngineType::FAISS_IDMAP); + ASSERT_NE(engine_ptr, nullptr); +} + +TEST(DBMiscTest, EXCEPTION_TEST) { + engine::Exception ex1(""); + std::string what = ex1.what(); + ASSERT_FALSE(what.empty()); + + engine::OutOfRangeException ex2; + what = ex2.what(); + ASSERT_FALSE(what.empty()); +} + +TEST(DBMiscTest, STATUS_TEST) { + engine::Status status = engine::Status::OK(); + std::string str = status.ToString(); + ASSERT_FALSE(str.empty()); + + status = engine::Status::Error("wrong", "mistake"); + ASSERT_TRUE(status.IsError()); + str = status.ToString(); + ASSERT_FALSE(str.empty()); + + status = engine::Status::NotFound("wrong", "mistake"); + ASSERT_TRUE(status.IsNotFound()); + str = status.ToString(); + ASSERT_FALSE(str.empty()); + + status = engine::Status::DBTransactionError("wrong", "mistake"); + ASSERT_TRUE(status.IsDBTransactionError()); + str = status.ToString(); + ASSERT_FALSE(str.empty()); + + engine::Status status_copy = engine::Status::OK(); + CopyStatus(status_copy, status); + ASSERT_TRUE(status.IsDBTransactionError()); +} + +TEST(DBMiscTest, OPTIONS_TEST) { + try { + engine::ArchiveConf archive("$$##"); + } catch (std::exception& ex) { + ASSERT_TRUE(true); + } + + { + engine::ArchiveConf archive("delete", "no"); + ASSERT_TRUE(archive.GetCriterias().empty()); + } + + { + engine::ArchiveConf archive("delete", "1:2"); + ASSERT_TRUE(archive.GetCriterias().empty()); + } + + { + engine::ArchiveConf archive("delete", "1:2:3"); + ASSERT_TRUE(archive.GetCriterias().empty()); + } + + { + engine::ArchiveConf archive("delete"); + engine::ArchiveConf::CriteriaT criterial = { + {"disk", 1024}, + {"days", 100} + }; + archive.SetCriterias(criterial); + + auto crit = archive.GetCriterias(); + ASSERT_EQ(criterial["disk"], 1024); + ASSERT_EQ(criterial["days"], 100); + } +} + +TEST(DBMiscTest, META_TEST) { + engine::DBMetaOptions options; + options.path = "/tmp/milvus_test"; + engine::meta::DBMetaImpl impl(options); + + time_t tt; + time( &tt ); + int delta = 10; + engine::meta::DateT dt = impl.GetDate(tt, delta); + ASSERT_GT(dt, 0); +} \ No newline at end of file diff --git a/cpp/unittest/db/mysql_db_test.cpp b/cpp/unittest/db/mysql_db_test.cpp new file mode 100644 index 0000000000..db3c84751e --- /dev/null +++ b/cpp/unittest/db/mysql_db_test.cpp @@ -0,0 +1,293 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include + +#include "utils.h" +#include "db/DB.h" +#include "db/DBImpl.h" +#include "db/MetaConsts.h" +#include "db/Factories.h" + +using namespace zilliz::milvus; + +namespace { + +static const std::string TABLE_NAME = "test_group"; +static constexpr int64_t TABLE_DIM = 256; +static constexpr int64_t VECTOR_COUNT = 250000; +static constexpr int64_t INSERT_LOOP = 100000; + +engine::meta::TableSchema BuildTableSchema() { + engine::meta::TableSchema table_info; + table_info.dimension_ = TABLE_DIM; + table_info.table_id_ = TABLE_NAME; + table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; + return table_info; +} + +void BuildVectors(int64_t n, std::vector& vectors) { + vectors.clear(); + vectors.resize(n*TABLE_DIM); + float* data = vectors.data(); + for(int i = 0; i < n; i++) { + for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; + } +} + +} + + +TEST_F(MySQLDBTest, DB_TEST) { + + auto options = GetOptions(); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + engine::IDNumbers vector_ids; + engine::IDNumbers target_ids; + + int64_t nb = 50; + std::vector xb; + BuildVectors(nb, xb); + + int64_t qb = 5; + std::vector qxb; + BuildVectors(qb, qxb); + + std::thread search([&]() { + engine::QueryResults results; + int k = 10; + std::this_thread::sleep_for(std::chrono::seconds(2)); + + INIT_TIMER; + std::stringstream ss; + uint64_t count = 0; + uint64_t prev_count = 0; + + for (auto j=0; j<10; ++j) { + ss.str(""); + db_->Size(count); + prev_count = count; + + START_TIMER; + stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); + ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; + STOP_TIMER(ss.str()); + + ASSERT_STATS(stat); + for (auto k=0; k= prev_count); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); + + int loop = INSERT_LOOP; + + for (auto i=0; iInsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); + ASSERT_EQ(target_ids.size(), qb); + } else { + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + } + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + search.join(); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; +}; + +TEST_F(MySQLDBTest, SEARCH_TEST) { + auto options = GetOptions(); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + // prepare raw data + size_t nb = VECTOR_COUNT; + size_t nq = 10; + size_t k = 5; + std::vector xb(nb*TABLE_DIM); + std::vector xq(nq*TABLE_DIM); + std::vector ids(nb); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_real_distribution<> dis_xt(-1.0, 1.0); + for (size_t i = 0; i < nb*TABLE_DIM; i++) { + xb[i] = dis_xt(gen); + if (i < nb){ + ids[i] = i; + } + } + for (size_t i = 0; i < nq*TABLE_DIM; i++) { + xq[i] = dis_xt(gen); + } + + // result data + //std::vector nns_gt(k*nq); + std::vector nns(k*nq); // nns = nearst neg search + //std::vector dis_gt(k*nq); + std::vector dis(k*nq); + + // insert data + const int batch_size = 100; + for (int j = 0; j < nb / batch_size; ++j) { + stat = db_->InsertVectors(TABLE_NAME, batch_size, xb.data()+batch_size*j*TABLE_DIM, ids); + if (j == 200){ sleep(1);} + ASSERT_STATS(stat); + } + + sleep(2); // wait until build index finish + + engine::QueryResults results; + stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results); + ASSERT_STATS(stat); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; + + // TODO(linxj): add groundTruth assert +}; + +TEST_F(MySQLDBTest, ARHIVE_DISK_CHECK) { + + auto options = GetOptions(); + options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1"); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + std::vector table_schema_array; + stat = db_->AllTables(table_schema_array); + ASSERT_STATS(stat); + bool bfound = false; + for(auto& schema : table_schema_array) { + if(schema.table_id_ == TABLE_NAME) { + bfound = true; + break; + } + } + ASSERT_TRUE(bfound); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + engine::IDNumbers vector_ids; + engine::IDNumbers target_ids; + + uint64_t size; + db_->Size(size); + + int64_t nb = 10; + std::vector xb; + BuildVectors(nb, xb); + + int loop = INSERT_LOOP; + for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + db_->Size(size); + LOG(DEBUG) << "size=" << size; + ASSERT_LE(size, 1 * engine::meta::G); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; +}; + +TEST_F(MySQLDBTest, DELETE_TEST) { + + auto options = GetOptions(); + options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1"); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); +// std::cout << stat.ToString() << std::endl; + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + +// std::cout << "location: " << table_info_get.location_ << std::endl; + ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_)); + + engine::IDNumbers vector_ids; + + uint64_t size; + db_->Size(size); + + int64_t nb = INSERT_LOOP; + std::vector xb; + BuildVectors(nb, xb); + + int loop = 20; + for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + std::vector dates; + stat = db_->DeleteTable(TABLE_NAME, dates); +// std::cout << "5 sec start" << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(5)); +// std::cout << "5 sec finish" << std::endl; + ASSERT_TRUE(stat.ok()); +// ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_)); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; +}; diff --git a/cpp/unittest/db/mysql_meta_test.cpp b/cpp/unittest/db/mysql_meta_test.cpp new file mode 100644 index 0000000000..436086acb3 --- /dev/null +++ b/cpp/unittest/db/mysql_meta_test.cpp @@ -0,0 +1,512 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include +#include + +#include "utils.h" +#include "db/MySQLMetaImpl.h" +#include "db/Factories.h" +#include "db/Utils.h" +#include "db/MetaConsts.h" + +#include "mysql++/mysql++.h" + +#include + +using namespace zilliz::milvus::engine; + +//TEST_F(MySQLTest, InitializeTest) { +// DBMetaOptions options; +// //dialect+driver://username:password@host:port/database +// options.backend_uri = "mysql://root:1234@:/test"; +// meta::MySQLMetaImpl impl(options); +// auto status = impl.Initialize(); +// std::cout << status.ToString() << std::endl; +// ASSERT_TRUE(status.ok()); +//} + +TEST_F(MySQLTest, core) { + DBMetaOptions options; +// //dialect+driver://username:password@host:port/database +// options.backend_uri = "mysql://root:1234@:/test"; +// options.path = "/tmp/vecwise_test"; + try { + options = getDBMetaOptions(); + } catch(std::exception& ex) { + ASSERT_TRUE(false); + return; + } + + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(options, mode); +// auto status = impl.Initialize(); +// ASSERT_TRUE(status.ok()); + + meta::TableSchema schema1; + schema1.table_id_ = "test1"; + schema1.dimension_ = 123; + + auto status = impl.CreateTable(schema1); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + meta::TableSchema schema2; + schema2.table_id_ = "test2"; + schema2.dimension_ = 321; + status = impl.CreateTable(schema2); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + status = impl.CreateTable(schema2); +// std::cout << status.ToString() << std::endl; +// ASSERT_THROW(impl.CreateTable(schema), mysqlpp::BadQuery); + ASSERT_TRUE(status.ok()); + + status = impl.DeleteTable(schema2.table_id_); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + size_t id1 = schema1.id_; + long created_on1 = schema1.created_on_; + status = impl.DescribeTable(schema1); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(schema1.id_, id1); + ASSERT_EQ(schema1.table_id_, "test1"); + ASSERT_EQ(schema1.created_on_, created_on1); + ASSERT_EQ(schema1.files_cnt_, 0); + ASSERT_EQ(schema1.engine_type_, 1); + ASSERT_EQ(schema1.store_raw_data_, false); + + bool check; + status = impl.HasTable("test1", check); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(check, true); + + std::vector table_schema_array; + status = impl.AllTables(table_schema_array); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_schema_array.size(), 1); + meta::TableSchema resultSchema = table_schema_array[0]; + ASSERT_EQ(resultSchema.id_, id1); + ASSERT_EQ(resultSchema.table_id_, "test1"); + ASSERT_EQ(resultSchema.dimension_, 123); + ASSERT_EQ(resultSchema.files_cnt_, 0); + ASSERT_EQ(resultSchema.engine_type_, 1); + ASSERT_EQ(resultSchema.store_raw_data_, false); + + meta::TableFileSchema tableFileSchema; + tableFileSchema.table_id_ = "test1"; + + status = impl.CreateTableFile(tableFileSchema); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + tableFileSchema.file_type_ = meta::TableFileSchema::TO_INDEX; + status = impl.UpdateTableFile(tableFileSchema); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + meta::TableFilesSchema filesToIndex; + status = impl.FilesToIndex(filesToIndex); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToIndex.size(), 1); + meta::TableFileSchema fileToIndex = filesToIndex[0]; + ASSERT_EQ(fileToIndex.table_id_, "test1"); + ASSERT_EQ(fileToIndex.dimension_, 123); + +// meta::TableFilesSchema filesToIndex; +// status = impl.FilesToIndex(filesToIndex); +// ASSERT_TRUE(status.ok()); +// ASSERT_EQ(filesToIndex.size(), 0); + + meta::DatesT partition; + partition.push_back(tableFileSchema.date_); + meta::DatePartionedTableFilesSchema filesToSearch; + status = impl.FilesToSearch(tableFileSchema.table_id_, partition, filesToSearch); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToSearch.size(), 1); + ASSERT_EQ(filesToSearch[tableFileSchema.date_].size(), 1); + meta::TableFileSchema fileToSearch = filesToSearch[tableFileSchema.date_][0]; + ASSERT_EQ(fileToSearch.table_id_, "test1"); + ASSERT_EQ(fileToSearch.dimension_, 123); + + tableFileSchema.file_type_ = meta::TableFileSchema::RAW; + status = impl.UpdateTableFile(tableFileSchema); + ASSERT_TRUE(status.ok()); + + meta::DatePartionedTableFilesSchema filesToMerge; + status = impl.FilesToMerge(tableFileSchema.table_id_, filesToMerge); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToMerge.size(), 1); + ASSERT_EQ(filesToMerge[tableFileSchema.date_].size(), 1); + meta::TableFileSchema fileToMerge = filesToMerge[tableFileSchema.date_][0]; + ASSERT_EQ(fileToMerge.table_id_, "test1"); + ASSERT_EQ(fileToMerge.dimension_, 123); + + meta::TableFilesSchema resultTableFilesSchema; + std::vector ids; + ids.push_back(tableFileSchema.id_); + status = impl.GetTableFiles(tableFileSchema.table_id_, ids, resultTableFilesSchema); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(resultTableFilesSchema.size(), 1); + meta::TableFileSchema resultTableFileSchema = resultTableFilesSchema[0]; +// ASSERT_EQ(resultTableFileSchema.id_, tableFileSchema.id_); + ASSERT_EQ(resultTableFileSchema.table_id_, tableFileSchema.table_id_); + ASSERT_EQ(resultTableFileSchema.file_id_, tableFileSchema.file_id_); + ASSERT_EQ(resultTableFileSchema.file_type_, tableFileSchema.file_type_); + ASSERT_EQ(resultTableFileSchema.size_, tableFileSchema.size_); + ASSERT_EQ(resultTableFileSchema.date_, tableFileSchema.date_); + ASSERT_EQ(resultTableFileSchema.engine_type_, tableFileSchema.engine_type_); + ASSERT_EQ(resultTableFileSchema.dimension_, tableFileSchema.dimension_); + + tableFileSchema.size_ = 234; + meta::TableSchema schema3; + schema3.table_id_ = "test3"; + schema3.dimension_ = 321; + status = impl.CreateTable(schema3); + ASSERT_TRUE(status.ok()); + meta::TableFileSchema tableFileSchema2; + tableFileSchema2.table_id_ = "test3"; + tableFileSchema2.size_ = 345; + status = impl.CreateTableFile(tableFileSchema2); + ASSERT_TRUE(status.ok()); + meta::TableFilesSchema filesToUpdate; + filesToUpdate.emplace_back(tableFileSchema); + filesToUpdate.emplace_back(tableFileSchema2); + status = impl.UpdateTableFile(tableFileSchema); + ASSERT_TRUE(status.ok()); + + uint64_t resultSize; + status = impl.Size(resultSize); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(resultSize, tableFileSchema.size_ + tableFileSchema2.size_); + + uint64_t countResult; + status = impl.Count(tableFileSchema.table_id_, countResult); + ASSERT_TRUE(status.ok()); + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); + +} + +TEST_F(MySQLTest, GROUP_TEST) { + DBMetaOptions options; + try { + options = getDBMetaOptions(); + } catch(std::exception& ex) { + ASSERT_TRUE(false); + return; + } + + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(options, mode); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + auto status = impl.CreateTable(group); + ASSERT_TRUE(status.ok()); + + auto gid = group.id_; + group.id_ = -1; + status = impl.DescribeTable(group); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(group.id_, gid); + ASSERT_EQ(group.table_id_, table_id); + + group.table_id_ = "not_found"; + status = impl.DescribeTable(group); + ASSERT_TRUE(!status.ok()); + + group.table_id_ = table_id; + status = impl.CreateTable(group); + ASSERT_TRUE(status.ok()); + + group.table_id_ = ""; + status = impl.CreateTable(group); + ASSERT_TRUE(status.ok()); + + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(MySQLTest, table_file_TEST) { + DBMetaOptions options; + try { + options = getDBMetaOptions(); + } catch(std::exception& ex) { + ASSERT_TRUE(false); + return; + } + + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(options, mode); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + group.dimension_ = 256; + auto status = impl.CreateTable(group); + + meta::TableFileSchema table_file; + table_file.table_id_ = group.table_id_; + status = impl.CreateTableFile(table_file); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); + + uint64_t cnt = 0; + status = impl.Count(table_id, cnt); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(cnt, 0UL); + + auto file_id = table_file.file_id_; + + auto new_file_type = meta::TableFileSchema::INDEX; + table_file.file_type_ = new_file_type; + + status = impl.UpdateTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.file_type_, new_file_type); + + meta::DatesT dates; + dates.push_back(meta::Meta::GetDate()); + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_FALSE(status.ok()); + + dates.clear(); + for (auto i=2; i < 10; ++i) { + dates.push_back(meta::Meta::GetDateWithDelta(-1*i)); + } + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_TRUE(status.ok()); + + table_file.date_ = meta::Meta::GetDateWithDelta(-2); + status = impl.UpdateTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2)); + ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE); + + dates.clear(); + dates.push_back(table_file.date_); + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_TRUE(status.ok()); + + std::vector ids = {table_file.id_}; + meta::TableFilesSchema files; + status = impl.GetTableFiles(table_file.table_id_, ids, files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(files.size(), 1UL); + ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE); + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(MySQLTest, ARCHIVE_TEST_DAYS) { + srand(time(0)); + DBMetaOptions options; + try { + options = getDBMetaOptions(); + } catch(std::exception& ex) { + ASSERT_TRUE(false); + return; + } + + int days_num = rand() % 100; + std::stringstream ss; + ss << "days:" << days_num; + options.archive_conf = ArchiveConf("delete", ss.str()); + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(options, mode); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + auto status = impl.CreateTable(group); + + meta::TableFilesSchema files; + meta::TableFileSchema table_file; + table_file.table_id_ = group.table_id_; + + auto cnt = 100; + long ts = utils::GetMicroSecTimeStamp(); + std::vector days; + std::vector ids; + for (auto i=0; i ids; + for (auto i=0; i +#include +#include +#include + +#include "db/scheduler/TaskScheduler.h" +#include "db/scheduler/TaskDispatchStrategy.h" +#include "db/scheduler/TaskDispatchQueue.h" +#include "db/scheduler/task/SearchTask.h" +#include "db/scheduler/task/DeleteTask.h" +#include "db/scheduler/task/IndexLoadTask.h" + +using namespace zilliz::milvus; + +namespace { + +engine::TableFileSchemaPtr CreateTabileFileStruct(size_t id, const std::string& table_id) { + auto file = std::make_shared(); + file->id_ = id; + file->table_id_ = table_id; + return file; +} + +} + +TEST(DBSchedulerTest, TASK_QUEUE_TEST) { + engine::TaskDispatchQueue queue; + queue.SetCapacity(1000); + queue.Put(nullptr); + ASSERT_EQ(queue.Size(), 1UL); + + auto ptr = queue.Take(); + ASSERT_EQ(ptr, nullptr); + ASSERT_TRUE(queue.Empty()); + + engine::SearchContextPtr context_ptr = std::make_shared(1, 1, nullptr); + for(size_t i = 0; i < 10; i++) { + auto file = CreateTabileFileStruct(i, "tbl"); + context_ptr->AddIndexFile(file); + } + + queue.Put(context_ptr); + ASSERT_EQ(queue.Size(), 10); + + auto index_files = context_ptr->GetIndexMap(); + + ptr = queue.Front(); + ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); + engine::IndexLoadTaskPtr load_task = std::static_pointer_cast(ptr); + ASSERT_EQ(load_task->file_->id_, index_files.begin()->first); + + ptr = queue.Back(); + ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); +} + +TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) { + std::list task_list; + bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); + ASSERT_FALSE(ret); + + for(size_t i = 10; i < 30; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, "tbl"); + task_list.push_back(task_ptr); + } + + engine::SearchContextPtr context_ptr = std::make_shared(1, 1, nullptr); + for(size_t i = 0; i < 20; i++) { + auto file = CreateTabileFileStruct(i, "tbl"); + context_ptr->AddIndexFile(file); + } + + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 30); +} + +TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) { + std::list task_list; + bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); + ASSERT_FALSE(ret); + + const std::string table_id = "to_delete_table"; + for(size_t i = 0; i < 10; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, table_id); + task_list.push_back(task_ptr); + } + + for(size_t i = 0; i < 10; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, "other_table"); + task_list.push_back(task_ptr); + } + + engine::meta::Meta::Ptr meta_ptr; + engine::DeleteContextPtr context_ptr = std::make_shared(table_id, meta_ptr); + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 21); + + auto temp_list = task_list; + for(size_t i = 0; ; i++) { + engine::ScheduleTaskPtr task_ptr = temp_list.front(); + temp_list.pop_front(); + if(task_ptr->type() == engine::ScheduleTaskType::kDelete) { + ASSERT_EQ(i, 10); + break; + } + } + + context_ptr = std::make_shared("no_task_table", meta_ptr); + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 22); + + engine::ScheduleTaskPtr task_ptr = task_list.front(); + ASSERT_EQ(task_ptr->type(), engine::ScheduleTaskType::kDelete); +} diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 0de876b39c..70c0712549 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -11,9 +11,29 @@ #include "utils.h" #include "db/Factories.h" +#include "db/Options.h" + +INITIALIZE_EASYLOGGINGPP using namespace zilliz::milvus; +static std::string uri; + +class DBTestEnvironment : public ::testing::Environment { +public: + +// explicit DBTestEnvironment(std::string uri) : uri_(uri) {} + + static std::string getURI() { + return uri; + } + + void SetUp() override { + getURI(); + } + +}; + void ASSERT_STATS(engine::Status& stat) { ASSERT_TRUE(stat.ok()); if(!stat.ok()) { @@ -21,6 +41,7 @@ void ASSERT_STATS(engine::Status& stat) { } } + void DBTest::InitLog() { el::Configurations defaultConf; defaultConf.setToDefault(); @@ -32,6 +53,7 @@ void DBTest::InitLog() { engine::Options DBTest::GetOptions() { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; return options; } @@ -50,6 +72,7 @@ engine::Options DBTest2::GetOptions() { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1"); + options.meta.backend_uri = "sqlite://:@:/"; return options; } @@ -61,3 +84,34 @@ void MetaTest::SetUp() { void MetaTest::TearDown() { impl_->DropAll(); } + +zilliz::milvus::engine::DBMetaOptions MySQLTest::getDBMetaOptions() { +// std::string path = "/tmp/milvus_test"; +// engine::DBMetaOptions options = engine::DBMetaOptionsFactory::Build(path); + zilliz::milvus::engine::DBMetaOptions options; + options.path = "/tmp/milvus_test"; + options.backend_uri = DBTestEnvironment::getURI(); + + if(options.backend_uri.empty()) { + throw std::exception(); + } + + return options; +} + +zilliz::milvus::engine::Options MySQLDBTest::GetOptions() { + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = DBTestEnvironment::getURI(); + return options; +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + if (argc > 1) { + uri = argv[1]; + } +// std::cout << uri << std::endl; + ::testing::AddGlobalTestEnvironment(new DBTestEnvironment); + return RUN_ALL_TESTS(); +} diff --git a/cpp/unittest/db/utils.h b/cpp/unittest/db/utils.h index cf0ff360f1..361c24b4be 100644 --- a/cpp/unittest/db/utils.h +++ b/cpp/unittest/db/utils.h @@ -8,9 +8,11 @@ #include #include +//#include #include "db/DB.h" #include "db/DBMetaImpl.h" +#include "db/MySQLMetaImpl.h" #define TIMING @@ -28,9 +30,28 @@ #define STOP_TIMER(name) #endif - void ASSERT_STATS(zilliz::milvus::engine::Status& stat); +//class TestEnv : public ::testing::Environment { +//public: +// +// static std::string getURI() { +// if (const char* uri = std::getenv("MILVUS_DBMETA_URI")) { +// return uri; +// } +// else { +// return ""; +// } +// } +// +// void SetUp() override { +// getURI(); +// } +// +//}; +// +//::testing::Environment* const test_env = +// ::testing::AddGlobalTestEnvironment(new TestEnv); class DBTest : public ::testing::Test { protected: @@ -55,3 +76,14 @@ protected: virtual void SetUp() override; virtual void TearDown() override; }; + +class MySQLTest : public ::testing::Test { +protected: +// std::shared_ptr impl_; + zilliz::milvus::engine::DBMetaOptions getDBMetaOptions(); +}; + +class MySQLDBTest : public ::testing::Test { +protected: + zilliz::milvus::engine::Options GetOptions(); +}; diff --git a/cpp/unittest/metrics/CMakeLists.txt b/cpp/unittest/metrics/CMakeLists.txt index 80210772a8..d31e44c056 100644 --- a/cpp/unittest/metrics/CMakeLists.txt +++ b/cpp/unittest/metrics/CMakeLists.txt @@ -17,6 +17,7 @@ aux_source_directory(../../src/config config_files) aux_source_directory(../../src/cache cache_srcs) aux_source_directory(../../src/wrapper wrapper_src) aux_source_directory(../../src/metrics metrics_src) +aux_source_directory(./ test_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) @@ -35,6 +36,7 @@ link_directories("/usr/local/cuda/lib64") #include_directories(../db/utils.h) include_directories(../../src/metrics) +include_directories(/usr/include/mysql) #set(metrics_src_files # ../../src/metrics/Metrics.cpp @@ -47,17 +49,14 @@ include_directories(../../src/metrics) # ) set(count_test_src - ${unittest_srcs} ${config_files} ${cache_srcs} ${db_srcs} ${db_scheduler_srcs} ${wrapper_src} ${metrics_src} - metrics_test.cpp - prometheus_test.cpp - ../db/utils.cpp - metricbase_test.cpp) + ${test_srcs} + ) add_executable(metrics_test ${count_test_src} ${require_files} ) @@ -75,6 +74,7 @@ target_link_libraries(metrics_test gtest pthread z + mysqlpp ${unittest_libs} ) diff --git a/cpp/unittest/metrics/metricbase_test.cpp b/cpp/unittest/metrics/metricbase_test.cpp index ac850c7b48..1997748fdd 100644 --- a/cpp/unittest/metrics/metricbase_test.cpp +++ b/cpp/unittest/metrics/metricbase_test.cpp @@ -11,7 +11,7 @@ using namespace zilliz::milvus; -TEST(MetricbaseTest, Metricbase_Test){ +TEST(MetricbaseTest, METRICBASE_TEST){ server::MetricsBase instance = server::MetricsBase::GetInstance(); instance.Init(); server::SystemInfo::GetInstance().Init(); diff --git a/cpp/unittest/metrics/metrics_test.cpp b/cpp/unittest/metrics/metrics_test.cpp index 923c7b717b..883e63ed03 100644 --- a/cpp/unittest/metrics/metrics_test.cpp +++ b/cpp/unittest/metrics/metrics_test.cpp @@ -15,7 +15,7 @@ #include #include "metrics/Metrics.h" -#include "../db/utils.h" +#include "utils.h" #include "db/DB.h" #include "db/DBMetaImpl.h" #include "db/Factories.h" @@ -24,7 +24,7 @@ using namespace zilliz::milvus; -TEST_F(DBTest, Metric_Tes) { +TEST_F(MetricTest, Metric_Tes) { server::SystemInfo::GetInstance().Init(); // server::Metrics::GetInstance().Init(); diff --git a/cpp/unittest/metrics/prometheus_test.cpp b/cpp/unittest/metrics/prometheus_test.cpp index 885abed566..521e00fc5c 100644 --- a/cpp/unittest/metrics/prometheus_test.cpp +++ b/cpp/unittest/metrics/prometheus_test.cpp @@ -11,7 +11,7 @@ using namespace zilliz::milvus; -TEST(PrometheusTest, Prometheus_Test){ +TEST(PrometheusTest, PROMETHEUS_TEST){ server::PrometheusMetrics instance = server::PrometheusMetrics::GetInstance(); instance.Init(); instance.SetStartup(true); diff --git a/cpp/unittest/metrics/utils.cpp b/cpp/unittest/metrics/utils.cpp new file mode 100644 index 0000000000..81e924a87e --- /dev/null +++ b/cpp/unittest/metrics/utils.cpp @@ -0,0 +1,79 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include + +#include "utils.h" +#include "db/Factories.h" +#include "db/Options.h" + +INITIALIZE_EASYLOGGINGPP + +using namespace zilliz::milvus; + +static std::string uri; + +class DBTestEnvironment : public ::testing::Environment { +public: + +// explicit DBTestEnvironment(std::string uri) : uri_(uri) {} + + static std::string getURI() { + return uri; + } + + void SetUp() override { + getURI(); + } + +}; + +void ASSERT_STATS(engine::Status& stat) { + ASSERT_TRUE(stat.ok()); + if(!stat.ok()) { + std::cout << stat.ToString() << std::endl; + } +} + + +void MetricTest::InitLog() { + el::Configurations defaultConf; + defaultConf.setToDefault(); + defaultConf.set(el::Level::Debug, + el::ConfigurationType::Format, "[%thread-%datetime-%level]: %msg (%fbase:%line)"); + el::Loggers::reconfigureLogger("default", defaultConf); +} + +engine::Options MetricTest::GetOptions() { + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; + return options; +} + +void MetricTest::SetUp() { + InitLog(); + auto options = GetOptions(); + db_ = engine::DBFactory::Build(options); +} + +void MetricTest::TearDown() { + delete db_; + boost::filesystem::remove_all("/tmp/milvus_test"); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + if (argc > 1) { + uri = argv[1]; + } +// std::cout << uri << std::endl; + ::testing::AddGlobalTestEnvironment(new DBTestEnvironment); + return RUN_ALL_TESTS(); +} diff --git a/cpp/unittest/metrics/utils.h b/cpp/unittest/metrics/utils.h new file mode 100644 index 0000000000..1badce00f2 --- /dev/null +++ b/cpp/unittest/metrics/utils.h @@ -0,0 +1,64 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +//#include + +#include "db/DB.h" +#include "db/DBMetaImpl.h" +#include "db/MySQLMetaImpl.h" + + +#define TIMING + +#ifdef TIMING +#define INIT_TIMER auto start = std::chrono::high_resolution_clock::now(); +#define START_TIMER start = std::chrono::high_resolution_clock::now(); +#define STOP_TIMER(name) LOG(DEBUG) << "RUNTIME of " << name << ": " << \ + std::chrono::duration_cast( \ + std::chrono::high_resolution_clock::now()-start \ + ).count() << " ms "; +#else +#define INIT_TIMER +#define START_TIMER +#define STOP_TIMER(name) +#endif + +void ASSERT_STATS(zilliz::milvus::engine::Status& stat); + +//class TestEnv : public ::testing::Environment { +//public: +// +// static std::string getURI() { +// if (const char* uri = std::getenv("MILVUS_DBMETA_URI")) { +// return uri; +// } +// else { +// return ""; +// } +// } +// +// void SetUp() override { +// getURI(); +// } +// +//}; +// +//::testing::Environment* const test_env = +// ::testing::AddGlobalTestEnvironment(new TestEnv); + +class MetricTest : public ::testing::Test { +protected: + zilliz::milvus::engine::DB* db_; + + void InitLog(); + virtual void SetUp() override; + virtual void TearDown() override; + virtual zilliz::milvus::engine::Options GetOptions(); +}; \ No newline at end of file diff --git a/cpp/unittest/server/appendix/server_config.yaml b/cpp/unittest/server/appendix/server_config.yaml index 9019461940..c937c57833 100644 --- a/cpp/unittest/server/appendix/server_config.yaml +++ b/cpp/unittest/server/appendix/server_config.yaml @@ -2,11 +2,14 @@ server_config: address: 0.0.0.0 port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534 gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1 - mode: single # milvus deployment type: single, cluster + mode: single # milvus deployment type: single, cluster, read_only db_config: db_path: /tmp/milvus # milvus data storage path - db_backend_url: http://127.0.0.1 # meta database uri + #URI format: dialect://username:password@host:port/database + #All parts except dialect are optional, but you MUST include the delimiters + #Currently supports mysql or sqlite + db_backend_url: mysql://root:1234@:/test # meta database uri index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB archive_days_threshold: 30 # files older than x days will be archived, unit: day