diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6fcb31a257..14f22b40bf 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -4,32 +4,10 @@ # Proprietary and confidential. #------------------------------------------------------------------------------- -cmake_minimum_required(VERSION 3.14) -message(STATUS "Building using CMake version: ${CMAKE_VERSION}") +cmake_minimum_required(VERSION 3.12) -set(MEGASEARCH_VERSION "0.1.0") - -string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" MEGASEARCH_BASE_VERSION "${MEGASEARCH_VERSION}") - -project(megasearch VERSION "${MEGASEARCH_BASE_VERSION}") project(vecwise_engine LANGUAGES CUDA CXX) -set(MEGASEARCH_VERSION_MAJOR "${megasearch_VERSION_MAJOR}") -set(MEGASEARCH_VERSION_MINOR "${megasearch_VERSION_MINOR}") -set(MEGASEARCH_VERSION_PATCH "${megasearch_VERSION_PATCH}") - -if(MEGASEARCH_VERSION_MAJOR STREQUAL "" - OR MEGASEARCH_VERSION_MINOR STREQUAL "" - OR MEGASEARCH_VERSION_PATCH STREQUAL "") - message(FATAL_ERROR "Failed to determine MegaSearch version from '${MEGASEARCH_VERSION}'") -endif() - -message(STATUS "MegaSearch version: " - "${MEGASEARCH_VERSION_MAJOR}.${MEGASEARCH_VERSION_MINOR}.${MEGASEARCH_VERSION_PATCH} " - "(full: '${MEGASEARCH_VERSION}')") - -set(MEGASEARCH_SOURCE_DIR ${PROJECT_SOURCE_DIR}) -set(MEGASEARCH_BINARY_DIR ${PROJECT_BINARY_DIR}) find_package(CUDA) set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -Xcompiler -fPIC -std=c++11 -D_FORCE_INLINES -arch sm_60 --expt-extended-lambda") set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O0 -g") @@ -54,31 +32,28 @@ else() set(VECWISE_BUILD_ARCH unknown) endif() +if(DEFINED UNIX) + message("building vecwise on Unix") + set(VECWISE_BUILD_SYSTEM macos) +elseif(DEFINED APPLE) + message("building vecwise on MacOS") + set(VECWISE_BUILD_SYSTEM unix) +else() + message("unknown OS") + set(VECWISE_BUILD_SYSTEM unknown) +endif () if(CMAKE_BUILD_TYPE STREQUAL "Release") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE") + if (GPU_VERSION STREQUAL "ON") + set(ENABLE_LICENSE "ON") + add_definitions("-DENABLE_LICENSE") + endif () else() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g -fPIC -DELPP_THREAD_SAFE") endif() -if (GPU_VERSION STREQUAL "ON") - set(ENABLE_LICENSE "ON") - add_definitions("-DENABLE_LICENSE") -endif () - -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") - -if (BUILD_UNIT_TEST) - option(MEGASEARCH_BUILD_TESTS "Build the megasearch test suite" ON) -endif(BUILD_UNIT_TEST) - -include(ExternalProject) -include(ThirdPartyPackages) - -include_directories(${MEGASEARCH_SOURCE_DIR}) -link_directories(${MEGASEARCH_BINARY_DIR}) - -## Following should be check +set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" ${CMAKE_MODULE_PATH}) set(VECWISE_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include) set(VECWISE_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src) @@ -97,9 +72,11 @@ link_directories(${VECWISE_THIRD_PARTY_BUILD}/lib64) #execute_process(COMMAND bash build.sh # WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party) - add_subdirectory(src) add_subdirectory(test_client) +#add_subdirectory(unittest) +add_subdirectory(unittest) + if (BUILD_UNIT_TEST) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/unittest) diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index a3d9effb46..49f2558f6d 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -4,22 +4,25 @@ # Proprietary and confidential. #------------------------------------------------------------------------------- + aux_source_directory(cache cache_files) aux_source_directory(config config_files) aux_source_directory(server server_files) aux_source_directory(utils utils_files) aux_source_directory(db db_files) aux_source_directory(wrapper wrapper_files) +aux_source_directory(metrics metrics_files) + set(license_check_files license/LicenseLibrary.cpp license/LicenseCheck.cpp ) -set(license_generator_src +set(license_generator_files license/LicenseGenerator.cpp license/LicenseLibrary.cpp - ) + ../unittest/metrics/counter_test.cpp ../unittest/metrics/metrics_test.cpp) set(service_files thrift/gen-cpp/VecService.cpp @@ -27,16 +30,18 @@ set(service_files thrift/gen-cpp/megasearch_types.cpp ) -set(vecwise_engine_src +set(vecwise_engine_files ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${cache_files} ${db_files} ${wrapper_files} + ${metrics_files} ) -set(get_sys_info_src +set(get_sys_info_files license/GetSysInfo.cpp) + include_directories(/usr/include) include_directories(/usr/local/cuda/include) @@ -52,6 +57,9 @@ if (GPU_VERSION STREQUAL "ON") cudart cublas libsqlite3.a + libprometheus-cpp-push.a + libprometheus-cpp-pull.a + libprometheus-cpp-core.a ) else() set(engine_libs @@ -62,6 +70,9 @@ else() libgfortran.a libquadmath.a libsqlite3.a + libprometheus-cpp-push.a + libprometheus-cpp-pull.a + libprometheus-cpp-core.a ) endif () @@ -80,7 +91,8 @@ if (ENABLE_LICENSE STREQUAL "ON") endif () -cuda_add_library(vecwise_engine STATIC ${vecwise_engine_src}) + +cuda_add_library(vecwise_engine STATIC ${vecwise_engine_files}) target_link_libraries(vecwise_engine ${engine_libs}) if (ENABLE_LICENSE STREQUAL "ON") @@ -88,14 +100,26 @@ if (ENABLE_LICENSE STREQUAL "ON") target_link_libraries(vecwise_license ${license_libs}) endif () +#set(metrics_lib +# libprometheus-cpp-push.a +# libprometheus-cpp-pull.a +# libprometheus-cpp-core.a +# ) + +add_library(metrics STATIC ${metrics_files}) +#target_link_libraries(metrics ${metrics_lib}) + add_executable(vecwise_server ${config_files} ${server_files} ${utils_files} ${service_files} +# ${metrics_files} ${VECWISE_THIRD_PARTY_BUILD}/include/easylogging++.cc ) + + set(server_libs vecwise_engine librocksdb.a @@ -119,11 +143,17 @@ else () endif() if (ENABLE_LICENSE STREQUAL "ON") - add_executable(license_generator ${license_generator_src}) - add_executable(get_sys_info ${get_sys_info_src}) + add_executable(license_generator ${license_generator_files}) target_link_libraries(get_sys_info ${license_libs} vecwise_license) target_link_libraries(license_generator ${license_libs}) - install(TARGETS get_sys_info DESTINATION bin) endif () -install(TARGETS vecwise_server DESTINATION bin) \ No newline at end of file +install(TARGETS vecwise_server DESTINATION bin) + +#target_link_libraries( +# libprometheus-cpp-push.a +# libprometheus-cpp-pull.a +# libprometheus-cpp-core.a +# pthread +# z +# ${CURL_LIBRARIES}) \ No newline at end of file diff --git a/cpp/src/cache/CacheMgr.cpp b/cpp/src/cache/CacheMgr.cpp index 9a875a5e01..b90b059e97 100644 --- a/cpp/src/cache/CacheMgr.cpp +++ b/cpp/src/cache/CacheMgr.cpp @@ -5,6 +5,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "CacheMgr.h" +#include "metrics/Metrics.h" namespace zilliz { namespace vecwise { @@ -37,7 +38,7 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) { if(cache_ == nullptr) { return nullptr; } - + METRICS_INSTANCE.CacheAccessTotalIncrement(); return cache_->get(key); } @@ -56,6 +57,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) { } cache_->insert(key, data); + METRICS_INSTANCE.CacheAccessTotalIncrement(); } void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) { @@ -65,6 +67,7 @@ void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index DataObjPtr obj = std::make_shared(index); cache_->insert(key, obj); + METRICS_INSTANCE.CacheAccessTotalIncrement(); } void CacheMgr::EraseItem(const std::string& key) { @@ -73,6 +76,7 @@ void CacheMgr::EraseItem(const std::string& key) { } cache_->erase(key); + METRICS_INSTANCE.CacheAccessTotalIncrement(); } void CacheMgr::PrintInfo() { diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 216a9b352d..1c8a062fa9 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -13,15 +13,18 @@ #include #include #include +#include "../utils/Log.h" #include "DBImpl.h" #include "DBMetaImpl.h" #include "Env.h" +#include "metrics/Metrics.h" namespace zilliz { namespace vecwise { namespace engine { + template DBImpl::DBImpl(const Options& options) : _env(options.env), @@ -36,41 +39,92 @@ DBImpl::DBImpl(const Options& options) template Status DBImpl::add_group(meta::GroupSchema& group_info) { - return _pMeta->add_group(group_info); + Status result = _pMeta->add_group(group_info); + if(result.ok()){ +// SERVER_LOG_INFO << "add_group request successed"; +// server::Metrics::GetInstance().add_group_success_total().Increment(); + } else{ +// SERVER_LOG_INFO << "add_group request failed"; +// server::Metrics::GetInstance().add_group_fail_total().Increment(); + } + return result; } template Status DBImpl::get_group(meta::GroupSchema& group_info) { - return _pMeta->get_group(group_info); + Status result = _pMeta->get_group(group_info); + if(result.ok()){ +// SERVER_LOG_INFO << "get_group request successed"; +// server::Metrics::GetInstance().get_group_success_total().Increment(); + } else{ +// SERVER_LOG_INFO << "get_group request failed"; +// server::Metrics::GetInstance().get_group_fail_total().Increment(); + } + return result; } template Status DBImpl::has_group(const std::string& group_id_, bool& has_or_not_) { - return _pMeta->has_group(group_id_, has_or_not_); + Status result = _pMeta->has_group(group_id_, has_or_not_); + if(result.ok()){ +// SERVER_LOG_INFO << "has_group request successed"; +// server::Metrics::GetInstance().has_group_success_total().Increment(); + } else{ +// SERVER_LOG_INFO << "has_group request failed"; +// server::Metrics::GetInstance().has_group_fail_total().Increment(); + } + return result; } template Status DBImpl::get_group_files(const std::string& group_id, const int date_delta, meta::GroupFilesSchema& group_files_info) { - return _pMeta->get_group_files(group_id, date_delta, group_files_info); + Status result = _pMeta->get_group_files(group_id, date_delta, group_files_info); + if(result.ok()){ +// SERVER_LOG_INFO << "get_group_files request successed"; +// server::Metrics::GetInstance().get_group_files_success_total().Increment(); + } else{ +// SERVER_LOG_INFO << "get_group_files request failed"; +// server::Metrics::GetInstance().get_group_files_fail_total().Increment(); + } + return result; } template Status DBImpl::add_vectors(const std::string& group_id_, size_t n, const float* vectors, IDNumbers& vector_ids_) { + + auto start_time = METRICS_NOW_TIME; Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_); + auto end_time = METRICS_NOW_TIME; + +// std::chrono::microseconds time_span = std::chrono::duration_cast(end_time - start_time); +// double average_time = double(time_span.count()) / n; + + double total_time = METRICS_MICROSECONDS(start_time,end_time); + double avg_time = total_time / n; + for (int i = 0; i < n; ++i) { + METRICS_INSTANCE.AddVectorsDurationHistogramOberve(avg_time); + } + +// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time)); + if (!status.ok()) { + METRICS_INSTANCE.AddVectorsFailTotalIncrement(n); return status; } + METRICS_INSTANCE.AddVectorsSuccessTotalIncrement(n); } template Status DBImpl::search(const std::string &group_id, size_t k, size_t nq, const float *vectors, QueryResults &results) { + meta::DatesT dates = {meta::Meta::GetDate()}; return search(group_id, k, nq, vectors, dates, results); + } template @@ -132,11 +186,33 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, index.Load(); auto file_size = index.PhysicalSize()/(1024*1024); search_set_size += file_size; + LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: " << file_size << " M"; int inner_k = index.Count() < k ? index.Count() : k; + auto start_time = METRICS_NOW_TIME; index.Search(nq, vectors, inner_k, output_distence, output_ids); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + if(file.file_type == meta::GroupFileSchema::RAW) { + METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time); + METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024); + METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024); + METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024); + + } else if(file.file_type == meta::GroupFileSchema::TO_INDEX) { + METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time); + METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024); + METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024); + METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024); + + } else { + METRICS_INSTANCE.SearchIndexDataDurationSecondsHistogramObserve(total_time); + METRICS_INSTANCE.IndexFileSizeHistogramObserve(file_size*1024*1024); + METRICS_INSTANCE.IndexFileSizeTotalIncrement(file_size*1024*1024); + METRICS_INSTANCE.IndexFileSizeGaugeSet(file_size*1024*1024); + } cluster(output_ids, output_distence, inner_k); // cluster to each query memset(output_distence, 0, k * nq * sizeof(float)); memset(output_ids, 0, k * nq * sizeof(long)); @@ -269,8 +345,14 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::Dat long index_size = 0; for (auto& file : files) { + + auto start_time = METRICS_NOW_TIME; index.Merge(file.location); auto file_schema = file; + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MemTableMergeDurationSecondsHistogramObserve(total_time); + file_schema.file_type = meta::GroupFileSchema::TO_DELETE; updated.push_back(file_schema); LOG(DEBUG) << "Merging file " << file_schema.file_id; @@ -279,6 +361,7 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::Dat if (index_size >= _options.index_trigger_size) break; } + index.Serialize(); if (index_size >= _options.index_trigger_size) { @@ -340,7 +423,11 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { EngineT to_index(file.dimension, file.location); to_index.Load(); + auto start_time = METRICS_NOW_TIME; auto index = to_index.BuildIndex(group_file.location); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + METRICS_INSTANCE.BuildIndexDurationSecondsHistogramObserve(total_time); group_file.file_type = meta::GroupFileSchema::INDEX; group_file.rows = index->Size(); diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index aaaaf21ce4..1de2164441 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -14,6 +14,7 @@ #include #include "DBMetaImpl.h" #include "IDGenerator.h" +#include "metrics/Metrics.h" namespace zilliz { namespace vecwise { @@ -105,6 +106,7 @@ Status DBMetaImpl::initialize() { } Status DBMetaImpl::add_group(GroupSchema& group_info) { + METRICS_INSTANCE.MetaAccessTotalIncrement(); if (group_info.group_id == "") { std::stringstream ss; SimpleIDGenerator g; @@ -113,7 +115,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) { } group_info.files_cnt = 0; group_info.id = -1; - + auto start_time = METRICS_NOW_TIME; { try { auto id = ConnectorPtr->insert(group_info); @@ -123,6 +125,9 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) { return Status::DBTransactionError("Add Group Error"); } } + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); auto group_path = GetGroupPath(group_info.group_id); @@ -143,11 +148,16 @@ Status DBMetaImpl::get_group(GroupSchema& group_info) { Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; auto groups = ConnectorPtr->select(columns(&GroupSchema::id, &GroupSchema::group_id, &GroupSchema::files_cnt, &GroupSchema::dimension), where(c(&GroupSchema::group_id) == group_info.group_id)); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); assert(groups.size() <= 1); if (groups.size() == 1) { group_info.id = std::get<0>(groups[0]); @@ -166,8 +176,13 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; auto groups = ConnectorPtr->select(columns(&GroupSchema::id), where(c(&GroupSchema::group_id) == group_id)); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); assert(groups.size() <= 1); if (groups.size() == 1) { has_or_not = true; @@ -204,7 +219,12 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { { try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; auto id = ConnectorPtr->insert(group_file); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); group_file.id = id; /* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */ } catch (...) { @@ -229,6 +249,8 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { files.clear(); try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time =METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, &GroupFileSchema::file_id, @@ -236,6 +258,9 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { &GroupFileSchema::rows, &GroupFileSchema::date), where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX)); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); std::map groups; @@ -277,6 +302,8 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, const DatesT& dates = (partition.empty() == true) ? today : partition; try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, &GroupFileSchema::file_id, @@ -288,7 +315,9 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, (c(&GroupFileSchema::file_type) == (int) GroupFileSchema::RAW or c(&GroupFileSchema::file_type) == (int) GroupFileSchema::TO_INDEX or c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX))); - + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); GroupSchema group_info; group_info.group_id = group_id; auto status = get_group_no_lock(group_info); @@ -325,6 +354,8 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, files.clear(); try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, &GroupFileSchema::file_id, @@ -333,7 +364,9 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, &GroupFileSchema::date), where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW and c(&GroupFileSchema::group_id) == group_id)); - + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); GroupSchema group_info; group_info.group_id = group_id; auto status = get_group_no_lock(group_info); @@ -389,7 +422,12 @@ Status DBMetaImpl::get_group_files(const std::string& group_id_, Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { group_file.updated_time = GetMicroSecTimeStamp(); try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; ConnectorPtr->update(group_file); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); /* auto commited = ConnectorPtr->transaction([&] () mutable { */ /* ConnectorPtr->update(group_file); */ /* return true; */ @@ -407,11 +445,16 @@ Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { Status DBMetaImpl::update_files(GroupFilesSchema& files) { try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; auto commited = ConnectorPtr->transaction([&] () mutable { for (auto& file : files) { file.updated_time = GetMicroSecTimeStamp(); ConnectorPtr->update(file); } + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); return true; }); if (!commited) { @@ -500,13 +543,17 @@ Status DBMetaImpl::cleanup() { Status DBMetaImpl::count(const std::string& group_id, long& result) { try { + METRICS_INSTANCE.MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::rows, &GroupFileSchema::date), where((c(&GroupFileSchema::file_type) == (int)GroupFileSchema::RAW or c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX or c(&GroupFileSchema::file_type) == (int)GroupFileSchema::INDEX) and c(&GroupFileSchema::group_id) == group_id)); - + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time,end_time); + METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); GroupSchema group_info; group_info.group_id = group_id; auto status = get_group_no_lock(group_info); diff --git a/cpp/src/db/FaissExecutionEngine.cpp b/cpp/src/db/FaissExecutionEngine.cpp index 605b979481..bb06fab8a2 100644 --- a/cpp/src/db/FaissExecutionEngine.cpp +++ b/cpp/src/db/FaissExecutionEngine.cpp @@ -16,6 +16,7 @@ #include #include "FaissExecutionEngine.h" +#include "metrics/Metrics.h" namespace zilliz { namespace vecwise { @@ -65,6 +66,7 @@ template Status FaissExecutionEngine::Load() { auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool to_cache = false; + auto start_time = METRICS_NOW_TIME; if (!index) { index = read_index(location_); to_cache = true; @@ -74,6 +76,16 @@ Status FaissExecutionEngine::Load() { pIndex_ = index->data(); if (to_cache) { Cache(); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + + METRICS_INSTANCE.FaissDiskLoadDurationSecondsHistogramObserve(total_time); + double total_size = (pIndex_->d) * (pIndex_->ntotal) * 4; + + + METRICS_INSTANCE.FaissDiskLoadSizeBytesHistogramObserve(total_size); + METRICS_INSTANCE.FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time)); + } return Status::OK(); } diff --git a/cpp/src/license/LicenseCheck.cpp b/cpp/src/license/LicenseCheck.cpp index 4ea29663cf..ca8fb4f930 100644 --- a/cpp/src/license/LicenseCheck.cpp +++ b/cpp/src/license/LicenseCheck.cpp @@ -74,7 +74,7 @@ LicenseCheck::AlterFile(const std::string &license_file_path, { exit(1); } -// printf("---runing---\n"); + printf("---runing---\n"); pt->expires_at(pt->expires_at() + boost::posix_time::hours(1)); pt->async_wait(boost::bind(AlterFile, license_file_path, boost::asio::placeholders::error, pt)); return SERVER_SUCCESS; @@ -83,7 +83,8 @@ LicenseCheck::AlterFile(const std::string &license_file_path, ServerError LicenseCheck::StartCountingDown(const std::string &license_file_path) { - if (!LicenseLibrary::IsFileExistent(license_file_path)) exit(1); + + if (!LicenseLibrary::IsFileExistent(license_file_path)) return SERVER_LICENSE_FILE_NOT_EXIST; boost::asio::io_service io; boost::asio::deadline_timer t(io, boost::posix_time::hours(1)); t.async_wait(boost::bind(AlterFile, license_file_path, boost::asio::placeholders::error, &t)); diff --git a/cpp/src/license/LicenseCheck.h b/cpp/src/license/LicenseCheck.h index 0fd1d87f35..9a22f57f5a 100644 --- a/cpp/src/license/LicenseCheck.h +++ b/cpp/src/license/LicenseCheck.h @@ -36,6 +36,7 @@ class LicenseCheck { static ServerError StartCountingDown(const std::string &license_file_path); + private: }; diff --git a/cpp/src/license/LicenseLibrary.cpp b/cpp/src/license/LicenseLibrary.cpp index d67d7cdcd4..669810552e 100644 --- a/cpp/src/license/LicenseLibrary.cpp +++ b/cpp/src/license/LicenseLibrary.cpp @@ -324,7 +324,7 @@ LicenseLibrary::GPUinfoFileDeserialization(const std::string &path, } ServerError -LicenseLibrary::GetDateTime(const char *cha, time_t &data_time) { +LicenseLibrary::GetDateTime(char *cha, time_t &data_time) { tm tm_; int year, month, day; sscanf(cha, "%d-%d-%d", &year, &month, &day); diff --git a/cpp/src/license/LicenseLibrary.h b/cpp/src/license/LicenseLibrary.h index d5e97ac8a3..a4202b1a0b 100644 --- a/cpp/src/license/LicenseLibrary.h +++ b/cpp/src/license/LicenseLibrary.h @@ -92,7 +92,7 @@ class LicenseLibrary { std::map &uuid_encrption_map); static ServerError - GetDateTime(const char *cha, time_t &data_time); + GetDateTime(char *cha, time_t &data_time); private: diff --git a/cpp/src/metrics/Metrics.cpp b/cpp/src/metrics/Metrics.cpp new file mode 100644 index 0000000000..4d0dd507cb --- /dev/null +++ b/cpp/src/metrics/Metrics.cpp @@ -0,0 +1,33 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Metrics.h" + +namespace zilliz { +namespace vecwise { +namespace server { + +ServerError +PrometheusMetrics::Init() { + ConfigNode& configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC); + startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true:false; + // Following should be read from config file. + const std::string bind_address = "8080"; + const std::string uri = std::string("/metrics"); + const std::size_t num_threads = 2; + + // Init Exposer + exposer_ptr_ = std::make_shared(bind_address, uri, num_threads); + + // Exposer Registry + exposer_ptr_->RegisterCollectable(registry_); + + return SERVER_SUCCESS; +} + +} +} +} diff --git a/cpp/src/metrics/Metrics.h b/cpp/src/metrics/Metrics.h new file mode 100644 index 0000000000..666a979358 --- /dev/null +++ b/cpp/src/metrics/Metrics.h @@ -0,0 +1,469 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "utils/Error.h" +#include +#include + + +#include +#include +#include "server/ServerConfig.h" + +#define METRICS_NOW_TIME std::chrono::system_clock::now() +#define METRICS_INSTANCE server::GetInstance() +#define METRICS_MICROSECONDS(a,b) (std::chrono::duration_cast (b-a)).count(); + + +namespace zilliz { +namespace vecwise { +namespace server { + +class MetricsBase{ + public: + static MetricsBase& + GetInstance(){ + static MetricsBase instance; + return instance; + } + + virtual ServerError Init() {}; + virtual void AddGroupSuccessTotalIncrement(double value = 1) {}; + virtual void AddGroupFailTotalIncrement(double value = 1) {}; + virtual void HasGroupSuccessTotalIncrement(double value = 1) {}; + virtual void HasGroupFailTotalIncrement(double value = 1) {}; + virtual void GetGroupSuccessTotalIncrement(double value = 1) {}; + virtual void GetGroupFailTotalIncrement(double value = 1) {}; + virtual void GetGroupFilesSuccessTotalIncrement(double value = 1) {}; + virtual void GetGroupFilesFailTotalIncrement(double value = 1) {}; + virtual void AddVectorsSuccessTotalIncrement(double value = 1) {}; + virtual void AddVectorsFailTotalIncrement(double value = 1) {}; + virtual void AddVectorsDurationHistogramOberve(double value) {}; + virtual void SearchSuccessTotalIncrement(double value = 1) {}; + virtual void SearchFailTotalIncrement(double value = 1) {}; + virtual void SearchDurationHistogramObserve(double value) {}; + virtual void RawFileSizeHistogramObserve(double value) {}; + virtual void IndexFileSizeHistogramObserve(double value) {}; + virtual void BuildIndexDurationSecondsHistogramObserve(double value) {}; + virtual void AllBuildIndexDurationSecondsHistogramObserve(double value) {}; + virtual void CacheUsageGaugeIncrement(double value = 1) {}; + virtual void CacheUsageGaugeDecrement(double value = 1) {}; + virtual void CacheUsageGaugeSet(double value) {}; + virtual void MetaVisitTotalIncrement(double value = 1) {}; + virtual void MetaVisitDurationSecondsHistogramObserve(double value) {}; + virtual void MemUsagePercentGaugeSet(double value) {}; + virtual void MemUsagePercentGaugeIncrement(double value = 1) {}; + virtual void MemUsagePercentGaugeDecrement(double value = 1) {}; + virtual void MemUsageTotalGaugeSet(double value) {}; + virtual void MemUsageTotalGaugeIncrement(double value = 1) {}; + virtual void MemUsageTotalGaugeDecrement(double value = 1) {}; + virtual void MetaAccessTotalIncrement(double value = 1) {}; + virtual void MetaAccessDurationSecondsHistogramObserve(double value) {}; + virtual void FaissDiskLoadDurationSecondsHistogramObserve(double value) {}; + virtual void FaissDiskLoadSizeBytesHistogramObserve(double value) {}; + virtual void FaissDiskLoadIOSpeedHistogramObserve(double value) {}; + virtual void CacheAccessTotalIncrement(double value = 1) {}; + virtual void MemTableMergeDurationSecondsHistogramObserve(double value) {}; + virtual void SearchIndexDataDurationSecondsHistogramObserve(double value) {}; + virtual void SearchRawDataDurationSecondsHistogramObserve(double value) {}; + virtual void IndexFileSizeTotalIncrement(double value = 1) {}; + virtual void RawFileSizeTotalIncrement(double value = 1) {}; + virtual void IndexFileSizeGaugeSet(double value) {}; + virtual void RawFileSizeGaugeSet(double value) {}; + + + +}; + +enum class MetricCollectorType{ + INVALID, + PROMETHEUS, + ZABBIX +}; + + + +class PrometheusMetrics: public MetricsBase { + + public: + static PrometheusMetrics & + GetInstance() { +// switch(MetricCollectorType) { +// case: prometheus:: +// static +// } + static PrometheusMetrics instance; + return instance; + } + + ServerError + Init(); + + private: + std::shared_ptr exposer_ptr_; + std::shared_ptr registry_ = std::make_shared(); + bool startup_ = false; + public: + + void AddGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) add_group_success_total_.Increment(value);}; + void AddGroupFailTotalIncrement(double value = 1.0) override { if(startup_) add_group_fail_total_.Increment(value);}; + void HasGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) has_group_success_total_.Increment(value);}; + void HasGroupFailTotalIncrement(double value = 1.0) override { if(startup_) has_group_fail_total_.Increment(value);}; + void GetGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) get_group_success_total_.Increment(value);}; + void GetGroupFailTotalIncrement(double value = 1.0) override { if(startup_) get_group_fail_total_.Increment(value);}; + void GetGroupFilesSuccessTotalIncrement(double value = 1.0) override { if(startup_) get_group_files_success_total_.Increment(value);}; + void GetGroupFilesFailTotalIncrement(double value = 1.0) override { if(startup_) get_group_files_fail_total_.Increment(value);}; + void AddVectorsSuccessTotalIncrement(double value = 1.0) override { if(startup_) add_vectors_success_total_.Increment(value);}; + void AddVectorsFailTotalIncrement(double value = 1.0) override { if(startup_) add_vectors_fail_total_.Increment(value);}; + void AddVectorsDurationHistogramOberve(double value) override { if(startup_) add_vectors_duration_histogram_.Observe(value);}; + void SearchSuccessTotalIncrement(double value = 1.0) override { if(startup_) search_success_total_.Increment(value);}; + void SearchFailTotalIncrement(double value = 1.0) override { if(startup_) search_fail_total_.Increment(value); }; + void SearchDurationHistogramObserve(double value) override { if(startup_) search_duration_histogram_.Observe(value);}; + void RawFileSizeHistogramObserve(double value) override { if(startup_) raw_files_size_histogram_.Observe(value);}; + void IndexFileSizeHistogramObserve(double value) override { if(startup_) index_files_size_histogram_.Observe(value);}; + void BuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) build_index_duration_seconds_histogram_.Observe(value);}; + void AllBuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) all_build_index_duration_seconds_histogram_.Observe(value);}; + void CacheUsageGaugeIncrement(double value = 1.0) override { if(startup_) cache_usage_gauge_.Increment(value);}; + void CacheUsageGaugeDecrement(double value = 1.0) override { if(startup_) cache_usage_gauge_.Decrement(value);}; + void CacheUsageGaugeSet(double value) override { if(startup_) cache_usage_gauge_.Set(value);}; +// void MetaVisitTotalIncrement(double value = 1.0) override { meta_visit_total_.Increment(value);}; +// void MetaVisitDurationSecondsHistogramObserve(double value) override { meta_visit_duration_seconds_histogram_.Observe(value);}; + void MemUsagePercentGaugeSet(double value) override { if(startup_) mem_usage_percent_gauge_.Set(value);}; + void MemUsagePercentGaugeIncrement(double value = 1.0) override { if(startup_) mem_usage_percent_gauge_.Increment(value);}; + void MemUsagePercentGaugeDecrement(double value = 1.0) override { if(startup_) mem_usage_percent_gauge_.Decrement(value);}; + void MemUsageTotalGaugeSet(double value) override { if(startup_) mem_usage_total_gauge_.Set(value);}; + void MemUsageTotalGaugeIncrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Increment(value);}; + void MemUsageTotalGaugeDecrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Decrement(value);}; + + void MetaAccessTotalIncrement(double value = 1) { if(startup_) meta_access_total_.Increment(value);}; + void MetaAccessDurationSecondsHistogramObserve(double value) { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);}; + + void FaissDiskLoadDurationSecondsHistogramObserve(double value) { if(startup_) faiss_disk_load_duration_seconds_histogram_.Observe(value);}; + void FaissDiskLoadSizeBytesHistogramObserve(double value) { if(startup_) faiss_disk_load_size_bytes_histogram_.Observe(value);}; + void FaissDiskLoadIOSpeedHistogramObserve(double value) { if(startup_) faiss_disk_load_IO_speed_histogram_.Observe(value);}; + + void CacheAccessTotalIncrement(double value = 1) { if(startup_) cache_access_total_.Increment(value);}; + void MemTableMergeDurationSecondsHistogramObserve(double value) { if(startup_) mem_table_merge_duration_seconds_histogram_.Observe(value);}; + void SearchIndexDataDurationSecondsHistogramObserve(double value) { if(startup_) search_index_data_duration_seconds_histogram_.Observe(value);}; + void SearchRawDataDurationSecondsHistogramObserve(double value) { if(startup_) search_raw_data_duration_seconds_histogram_.Observe(value);}; + void IndexFileSizeTotalIncrement(double value = 1) { if(startup_) index_file_size_total_.Increment(value);}; + void RawFileSizeTotalIncrement(double value = 1) { if(startup_) raw_file_size_total_.Increment(value);}; + void IndexFileSizeGaugeSet(double value) { if(startup_) index_file_size_gauge_.Set(value);}; + void RawFileSizeGaugeSet(double value) { if(startup_) raw_file_size_gauge_.Set(value);}; + + + + + +// prometheus::Counter &connection_total() {return connection_total_; } +// +// prometheus::Counter &add_group_success_total() { return add_group_success_total_; } +// prometheus::Counter &add_group_fail_total() { return add_group_fail_total_; } +// +// prometheus::Counter &get_group_success_total() { return get_group_success_total_;} +// prometheus::Counter &get_group_fail_total() { return get_group_fail_total_;} +// +// prometheus::Counter &has_group_success_total() { return has_group_success_total_;} +// prometheus::Counter &has_group_fail_total() { return has_group_fail_total_;} +// +// prometheus::Counter &get_group_files_success_total() { return get_group_files_success_total_;}; +// prometheus::Counter &get_group_files_fail_total() { return get_group_files_fail_total_;} +// +// prometheus::Counter &add_vectors_success_total() { return add_vectors_success_total_; } +// prometheus::Counter &add_vectors_fail_total() { return add_vectors_fail_total_; } +// +// prometheus::Histogram &add_vectors_duration_histogram() { return add_vectors_duration_histogram_;} +// +// prometheus::Counter &search_success_total() { return search_success_total_; } +// prometheus::Counter &search_fail_total() { return search_fail_total_; } +// +// prometheus::Histogram &search_duration_histogram() { return search_duration_histogram_; } +// prometheus::Histogram &raw_files_size_histogram() { return raw_files_size_histogram_; } +// prometheus::Histogram &index_files_size_histogram() { return index_files_size_histogram_; } +// +// prometheus::Histogram &build_index_duration_seconds_histogram() { return build_index_duration_seconds_histogram_; } +// +// prometheus::Histogram &all_build_index_duration_seconds_histogram() { return all_build_index_duration_seconds_histogram_; } +// +// prometheus::Gauge &cache_usage_gauge() { return cache_usage_gauge_; } +// +// prometheus::Counter &meta_visit_total() { return meta_visit_total_; } +// +// prometheus::Histogram &meta_visit_duration_seconds_histogram() { return meta_visit_duration_seconds_histogram_; } +// +// prometheus::Gauge &mem_usage_percent_gauge() { return mem_usage_percent_gauge_; } +// +// prometheus::Gauge &mem_usage_total_gauge() { return mem_usage_total_gauge_; } + + + + + std::shared_ptr &exposer_ptr() {return exposer_ptr_; } +// prometheus::Exposer& exposer() { return exposer_;} + std::shared_ptr ®istry_ptr() {return registry_; } + + // ..... + private: + ////all from db_connection.cpp +// prometheus::Family &connect_request_ = prometheus::BuildCounter() +// .Name("connection_total") +// .Help("total number of connection has been made") +// .Register(*registry_); +// prometheus::Counter &connection_total_ = connect_request_.Add({}); + + + + ////all from DBImpl.cpp + using BucketBoundaries = std::vector; + //record add_group request + prometheus::Family &add_group_request_ = prometheus::BuildCounter() + .Name("add_group_request_total") + .Help("the number of add_group request") + .Register(*registry_); + + prometheus::Counter &add_group_success_total_ = add_group_request_.Add({{"outcome", "success"}}); + prometheus::Counter &add_group_fail_total_ = add_group_request_.Add({{"outcome", "fail"}}); + + + //record get_group request + prometheus::Family &get_group_request_ = prometheus::BuildCounter() + .Name("get_group_request_total") + .Help("the number of get_group request") + .Register(*registry_); + + prometheus::Counter &get_group_success_total_ = get_group_request_.Add({{"outcome", "success"}}); + prometheus::Counter &get_group_fail_total_ = get_group_request_.Add({{"outcome", "fail"}}); + + + //record has_group request + prometheus::Family &has_group_request_ = prometheus::BuildCounter() + .Name("has_group_request_total") + .Help("the number of has_group request") + .Register(*registry_); + + prometheus::Counter &has_group_success_total_ = has_group_request_.Add({{"outcome", "success"}}); + prometheus::Counter &has_group_fail_total_ = has_group_request_.Add({{"outcome", "fail"}}); + + + //record get_group_files + prometheus::Family &get_group_files_request_ = prometheus::BuildCounter() + .Name("get_group_files_request_total") + .Help("the number of get_group_files request") + .Register(*registry_); + + prometheus::Counter &get_group_files_success_total_ = get_group_files_request_.Add({{"outcome", "success"}}); + prometheus::Counter &get_group_files_fail_total_ = get_group_files_request_.Add({{"outcome", "fail"}}); + + + //record add_vectors count and average time + //need to be considered + prometheus::Family &add_vectors_request_ = prometheus::BuildCounter() + .Name("add_vectors_request_total") + .Help("the number of vectors added") + .Register(*registry_); + prometheus::Counter &add_vectors_success_total_ = add_vectors_request_.Add({{"outcome", "success"}}); + prometheus::Counter &add_vectors_fail_total_ = add_vectors_request_.Add({{"outcome", "fail"}}); + + prometheus::Family &add_vectors_duration_seconds_ = prometheus::BuildHistogram() + .Name("add_vector_duration_seconds") + .Help("average time of adding every vector") + .Register(*registry_); + prometheus::Histogram &add_vectors_duration_histogram_ = add_vectors_duration_seconds_.Add({}, BucketBoundaries{0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.08, 0.1, 0.5, 1}); + + + //record search count and average time + prometheus::Family &search_request_ = prometheus::BuildCounter() + .Name("search_request_total") + .Help("the number of search request") + .Register(*registry_); + prometheus::Counter &search_success_total_ = search_request_.Add({{"outcome","success"}}); + prometheus::Counter &search_fail_total_ = search_request_.Add({{"outcome","fail"}}); + + prometheus::Family &search_request_duration_seconds_ = prometheus::BuildHistogram() + .Name("search_request_duration_second") + .Help("histogram of processing time for each search") + .Register(*registry_); + prometheus::Histogram &search_duration_histogram_ = search_request_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); + + //record raw_files size histogram + prometheus::Family &raw_files_size_ = prometheus::BuildHistogram() + .Name("search_raw_files_bytes") + .Help("histogram of raw files size by bytes") + .Register(*registry_); + prometheus::Histogram &raw_files_size_histogram_ = raw_files_size_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); + + //record index_files size histogram + prometheus::Family &index_files_size_ = prometheus::BuildHistogram() + .Name("search_index_files_bytes") + .Help("histogram of index files size by bytes") + .Register(*registry_); + prometheus::Histogram &index_files_size_histogram_ = index_files_size_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); + + //record index and raw files size counter + prometheus::Family &file_size_total_ = prometheus::BuildCounter() + .Name("search_file_size_total") + .Help("searched index and raw file size") + .Register(*registry_); + prometheus::Counter &index_file_size_total_ = file_size_total_.Add({{"type", "index"}}); + prometheus::Counter &raw_file_size_total_ = file_size_total_.Add({{"type", "raw"}}); + + //record index and raw files size counter + prometheus::Family &file_size_gauge_ = prometheus::BuildGauge() + .Name("search_file_size_gauge") + .Help("searched current index and raw file size") + .Register(*registry_); + prometheus::Gauge &index_file_size_gauge_ = file_size_gauge_.Add({{"type", "index"}}); + prometheus::Gauge &raw_file_size_gauge_ = file_size_gauge_.Add({{"type", "raw"}}); + + //record processing time for building index + prometheus::Family &build_index_duration_seconds_ = prometheus::BuildHistogram() + .Name("build_index_duration_seconds") + .Help("histogram of processing time for building index") + .Register(*registry_); + prometheus::Histogram &build_index_duration_seconds_histogram_ = build_index_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); + + + //record processing time for all building index + prometheus::Family &all_build_index_duration_seconds_ = prometheus::BuildHistogram() + .Name("all_build_index_duration_seconds") + .Help("histogram of processing time for building index") + .Register(*registry_); + prometheus::Histogram &all_build_index_duration_seconds_histogram_ = all_build_index_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); + + //record duration of merging mem table + prometheus::Family &mem_table_merge_duration_seconds_ = prometheus::BuildHistogram() + .Name("mem_table_merge_duration_seconds") + .Help("histogram of processing time for merging mem tables") + .Register(*registry_); + prometheus::Histogram &mem_table_merge_duration_seconds_histogram_ = mem_table_merge_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); + + //record search index and raw data duration + prometheus::Family &search_data_duration_seconds_ = prometheus::BuildHistogram() + .Name("search_data_duration_seconds") + .Help("histograms of processing time for search index and raw data") + .Register(*registry_); + prometheus::Histogram &search_index_data_duration_seconds_histogram_ = search_data_duration_seconds_.Add({{"type", "index"}}, BucketBoundaries{0.1, 1.0, 10.0}); + prometheus::Histogram &search_raw_data_duration_seconds_histogram_ = search_data_duration_seconds_.Add({{"type", "raw"}}, BucketBoundaries{0.1, 1.0, 10.0}); + + + ////all form Cache.cpp + //record cache usage, when insert/erase/clear/free + prometheus::Family &cache_usage_ = prometheus::BuildGauge() + .Name("cache_usage") + .Help("total bytes that cache used") + .Register(*registry_); + prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({}); + + + ////all from Meta.cpp + //record meta visit count and time +// prometheus::Family &meta_visit_ = prometheus::BuildCounter() +// .Name("meta_visit_total") +// .Help("the number of accessing Meta") +// .Register(*registry_); +// prometheus::Counter &meta_visit_total_ = meta_visit_.Add({{}}); +// +// prometheus::Family &meta_visit_duration_seconds_ = prometheus::BuildHistogram() +// .Name("meta_visit_duration_seconds") +// .Help("histogram of processing time to get data from mata") +// .Register(*registry_); +// prometheus::Histogram &meta_visit_duration_seconds_histogram_ = meta_visit_duration_seconds_.Add({{}}, BucketBoundaries{0.1, 1.0, 10.0}); + + + ////all from MemManager.cpp + //record memory usage percent + prometheus::Family &mem_usage_percent_ = prometheus::BuildGauge() + .Name("memory_usage_percent") + .Help("memory usage percent") + .Register(*registry_); + prometheus::Gauge &mem_usage_percent_gauge_ = mem_usage_percent_.Add({}); + + //record memory usage toal + prometheus::Family &mem_usage_total_ = prometheus::BuildGauge() + .Name("memory_usage_total") + .Help("memory usage total") + .Register(*registry_); + prometheus::Gauge &mem_usage_total_gauge_ = mem_usage_total_.Add({}); + + + + ////all from DBMetaImpl.cpp + //record meta access count + prometheus::Family &meta_access_ = prometheus::BuildCounter() + .Name("meta_access_total") + .Help("the number of meta accessing") + .Register(*registry_); + prometheus::Counter &meta_access_total_ = meta_access_.Add({}); + + //record meta access duration + prometheus::Family &meta_access_duration_seconds_ = prometheus::BuildHistogram() + .Name("meta_access_duration_seconds") + .Help("histogram of processing time for accessing mata") + .Register(*registry_); + prometheus::Histogram &meta_access_duration_seconds_histogram_ = meta_access_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); + + + + ////all from FaissExecutionEngine.cpp + //record data loading from disk count, size, duration, IO speed + prometheus::Family &disk_load_duration_second_ = prometheus::BuildHistogram() + .Name("disk_load_duration_seconds") + .Help("Histogram of processing time for loading data from disk") + .Register(*registry_); + prometheus::Histogram &faiss_disk_load_duration_seconds_histogram_ = disk_load_duration_second_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0}); + + prometheus::Family &disk_load_size_bytes_ = prometheus::BuildHistogram() + .Name("disk_load_size_bytes") + .Help("Histogram of data size by bytes for loading data from disk") + .Register(*registry_); + prometheus::Histogram &faiss_disk_load_size_bytes_histogram_ = disk_load_size_bytes_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0}); + + prometheus::Family &disk_load_IO_speed_ = prometheus::BuildHistogram() + .Name("disk_load_IO_speed_byte_per_sec") + .Help("Histogram of IO speed for loading data from disk") + .Register(*registry_); + prometheus::Histogram &faiss_disk_load_IO_speed_histogram_ = disk_load_IO_speed_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0}); + + ////all from CacheMgr.cpp + //record cache access count + prometheus::Family &cache_access_ = prometheus::BuildCounter() + .Name("cache_access_total") + .Help("the count of accessing cache ") + .Register(*registry_); + prometheus::Counter &cache_access_total_ = cache_access_.Add({}); + +}; + +static MetricsBase& CreateMetricsCollector(MetricCollectorType collector_type) { + switch(collector_type) { + case MetricCollectorType::PROMETHEUS: + static PrometheusMetrics instance = PrometheusMetrics::GetInstance(); + return instance; + default: + return MetricsBase::GetInstance(); + } +} + +static MetricsBase& GetInstance(){ + ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC); + std::string collector_typr_str = config.GetValue(CONFIG_METRIC_COLLECTOR); + if(collector_typr_str == "prometheus") { + return CreateMetricsCollector(MetricCollectorType::PROMETHEUS); + } else if(collector_typr_str == "zabbix"){ + return CreateMetricsCollector(MetricCollectorType::ZABBIX); + } else { + return CreateMetricsCollector(MetricCollectorType::INVALID); + } +} + + +} +} +} + + + diff --git a/cpp/src/sdk/MegaSearch.cpp b/cpp/src/sdk/MegaSearch.cpp deleted file mode 100644 index 66c7dceb15..0000000000 --- a/cpp/src/sdk/MegaSearch.cpp +++ /dev/null @@ -1,96 +0,0 @@ -#include "MegaSearch.h" - - -namespace megasearch { -std::shared_ptr -Create() { - return nullptr; -} - -Status -Destroy(std::shared_ptr &connection_ptr) { - return Status::OK(); -} - -/** -Status -Connection::Connect(const ConnectParam ¶m) { - return Status::NotSupported("Connect interface is not supported."); -} - -Status -Connection::Connect(const std::string &uri) { - return Status::NotSupported("Connect interface is not supported."); -} - -Status -Connection::Connected() const { - return Status::NotSupported("Connected interface is not supported."); -} - -Status -Connection::Disconnect() { - return Status::NotSupported("Disconnect interface is not supported."); -} - -std::string -Connection::ClientVersion() const { - return std::string("Current Version"); -} - -Status -Connection::CreateTable(const TableSchema ¶m) { - return Status::NotSupported("Create table interface interface is not supported."); -} - -Status -Connection::CreateTablePartition(const CreateTablePartitionParam ¶m) { - return Status::NotSupported("Create table partition interface is not supported."); -} - -Status -Connection::DeleteTablePartition(const DeleteTablePartitionParam ¶m) { - return Status::NotSupported("Delete table partition interface is not supported."); -} - -Status -Connection::DeleteTable(const std::string &table_name) { - return Status::NotSupported("Create table interface is not supported."); -} - -Status -Connection::AddVector(const std::string &table_name, - const std::vector &record_array, - std::vector &id_array) { - return Status::NotSupported("Add vector array interface is not supported."); -} - -Status -Connection::SearchVector(const std::string &table_name, - const std::vector &query_record_array, - std::vector &topk_query_result_array, - int64_t topk) { - return Status::NotSupported("Query vector array interface is not supported."); -} - -Status -Connection::DescribeTable(const std::string &table_name, TableSchema &table_schema) { - return Status::NotSupported("Show table interface is not supported."); -} - -Status -Connection::ShowTables(std::vector &table_array) { - return Status::NotSupported("List table array interface is not supported."); -} - -std::string -Connection::ServerVersion() const { - return std::string("Server version."); -} - -std::string -Connection::ServerStatus() const { - return std::string("Server status"); -} -**/ -} \ No newline at end of file diff --git a/cpp/src/sdk/MegaSearch.h b/cpp/src/sdk/MegaSearch.h deleted file mode 100644 index d006927614..0000000000 --- a/cpp/src/sdk/MegaSearch.h +++ /dev/null @@ -1,339 +0,0 @@ -#pragma once - -#include "Status.h" - -#include -#include -#include -#include - -/** \brief MegaSearch SDK namespace - */ -namespace megasearch { - - -/** - * @brief Column Type - */ -enum class ColumnType { - invalid, - int8, - int16, - int32, - int64, - float32, - float64, - date, - vector -}; - -/** - * @brief Index Type - */ -enum class IndexType { - raw, - ivfflat -}; - -/** - * @brief Connect API parameter - */ -struct ConnectParam { - std::string ip_address; ///< Server IP address - std::string port; ///< Server PORT -}; - -/** - * @brief Table column description - */ -struct Column { - ColumnType type = ColumnType::invalid; ///< Column Type: enum ColumnType - std::string name; ///< Column name -}; - -/** - * @brief Table vector column description - */ -struct VectorColumn : public Column { - VectorColumn() { type = ColumnType::vector; } - int64_t dimension = 0; ///< Vector dimension - IndexType index_type = IndexType::raw; ///< Index type - bool store_raw_vector = false; ///< Is vector self stored in the table -}; - -/** - * @brief Table Schema - */ -struct TableSchema { - std::string table_name; ///< Table name - std::vector vector_column_array; ///< Vector column description - std::vector attribute_column_array; ///< Columns description - std::vector partition_column_name_array; ///< Partition column name -}; - -/** - * @brief Range information - */ -struct Range { - std::string start_value; ///< Range start - std::string end_value; ///< Range stop -}; - -/** - * @brief Create table partition parameters - */ -struct CreateTablePartitionParam { - std::string table_name; ///< Table name, vector/float32/float64 type column is not allowed for partition - std::string partition_name; ///< Partition name, created partition name - std::map range_map; ///< Column name to PartitionRange map -}; - - -/** - * @brief Delete table partition parameters - */ -struct DeleteTablePartitionParam { - std::string table_name; ///< Table name - std::vector partition_name_array; ///< Partition name array -}; - -/** - * @brief Record inserted - */ -struct RowRecord { - std::map> vector_map; ///< Vector columns - std::map attribute_map; ///< Other attribute columns -}; - -/** - * @brief Query record - */ -struct QueryRecord { - std::map> vector_map; ///< Query vectors - std::vector selected_column_array; ///< Output column array - std::map> partition_filter_column_map; ///< Range used to select partitions -}; - -/** - * @brief Query result - */ -struct QueryResult { - int64_t id; ///< Output result - double score; ///< Vector similarity score: 0 ~ 100 - std::map column_map; ///< Other column -}; - -/** - * @brief TopK query result - */ -struct TopKQueryResult { - std::vector query_result_arrays; ///< TopK query result -}; - -/** - * @brief SDK main class - */ -class Connection { - public: - - /** - * @brief CreateConnection - * - * Create a connection instance and return it's shared pointer - * - * @return Connection instance pointer - */ - - static std::shared_ptr - Create(); - - /** - * @brief DestroyConnection - * - * Destroy the connection instance - * - * @param connection, the shared pointer to the instance to be destroyed - * - * @return if destroy is successful - */ - - static Status - Destroy(std::shared_ptr connection_ptr); - - /** - * @brief Connect - * - * Connect function should be called before any operations - * Server will be connected after Connect return OK - * - * @param param, use to provide server information - * - * @return Indicate if connect is successful - */ - - virtual Status Connect(const ConnectParam ¶m) = 0; - - /** - * @brief Connect - * - * Connect function should be called before any operations - * Server will be connected after Connect return OK - * - * @param uri, use to provide server information, example: megasearch://ipaddress:port - * - * @return Indicate if connect is successful - */ - virtual Status Connect(const std::string &uri) = 0; - - /** - * @brief connected - * - * Connection status. - * - * @return Indicate if connection status - */ - virtual Status Connected() const = 0; - - /** - * @brief Disconnect - * - * Server will be disconnected after Disconnect return OK - * - * @return Indicate if disconnect is successful - */ - virtual Status Disconnect() = 0; - - - /** - * @brief Create table method - * - * This method is used to create table - * - * @param param, use to provide table information to be created. - * - * @return Indicate if table is created successfully - */ - virtual Status CreateTable(const TableSchema ¶m) = 0; - - - /** - * @brief Delete table method - * - * This method is used to delete table. - * - * @param table_name, table name is going to be deleted. - * - * @return Indicate if table is delete successfully. - */ - virtual Status DeleteTable(const std::string &table_name) = 0; - - - /** - * @brief Create table partition - * - * This method is used to create table partition. - * - * @param param, use to provide partition information to be created. - * - * @return Indicate if table partition is created successfully. - */ - virtual Status CreateTablePartition(const CreateTablePartitionParam ¶m) = 0; - - - /** - * @brief Delete table partition - * - * This method is used to delete table partition. - * - * @param param, use to provide partition information to be deleted. - * - * @return Indicate if table partition is delete successfully. - */ - virtual Status DeleteTablePartition(const DeleteTablePartitionParam ¶m) = 0; - - - /** - * @brief Add vector to table - * - * This method is used to add vector array to table. - * - * @param table_name, table_name is inserted. - * @param record_array, vector array is inserted. - * @param id_array, after inserted every vector is given a id. - * - * @return Indicate if vector array are inserted successfully - */ - virtual Status AddVector(const std::string &table_name, - const std::vector &record_array, - std::vector &id_array) = 0; - - - /** - * @brief Search vector - * - * This method is used to query vector in table. - * - * @param table_name, table_name is queried. - * @param query_record_array, all vector are going to be queried. - * @param topk_query_result_array, result array. - * @param topk, how many similarity vectors will be searched. - * - * @return Indicate if query is successful. - */ - virtual Status SearchVector(const std::string &table_name, - const std::vector &query_record_array, - std::vector &topk_query_result_array, - int64_t topk) = 0; - - /** - * @brief Show table description - * - * This method is used to show table information. - * - * @param table_name, which table is show. - * @param table_schema, table_schema is given when operation is successful. - * - * @return Indicate if this operation is successful. - */ - virtual Status DescribeTable(const std::string &table_name, TableSchema &table_schema) = 0; - - /** - * @brief Show all tables in database - * - * This method is used to list all tables. - * - * @param table_array, all tables are push into the array. - * - * @return Indicate if this operation is successful. - */ - virtual Status ShowTables(std::vector &table_array) = 0; - - /** - * @brief Give the client version - * - * This method is used to give the client version. - * - * @return Client version. - */ - virtual std::string ClientVersion() const = 0; - - /** - * @brief Give the server version - * - * This method is used to give the server version. - * - * @return Server version. - */ - virtual std::string ServerVersion() const = 0; - - /** - * @brief Give the server status - * - * This method is used to give the server status. - * - * @return Server status. - */ - virtual std::string ServerStatus() const = 0; -}; - -} \ No newline at end of file diff --git a/cpp/src/sdk/Status.cpp b/cpp/src/sdk/Status.cpp deleted file mode 100644 index 8e82affd2a..0000000000 --- a/cpp/src/sdk/Status.cpp +++ /dev/null @@ -1,115 +0,0 @@ -#include "Status.h" - - -namespace megasearch { - -Status::~Status() noexcept { - if (state_ != nullptr) { - delete state_; - state_ = nullptr; - } -} - -static inline std::ostream &operator<<(std::ostream &os, const Status &x) { - os << x.ToString(); - return os; -} - -void Status::MoveFrom(Status &s) { - delete state_; - state_ = s.state_; - s.state_ = nullptr; -} - -Status::Status(const Status &s) - : state_((s.state_ == nullptr) ? nullptr : new State(*s.state_)) {} - -Status &Status::operator=(const Status &s) { - if (state_ != s.state_) { - CopyFrom(s); - } - return *this; -} - -Status &Status::operator=(Status &&s) noexcept { - MoveFrom(s); - return *this; -} - -Status Status::operator&(const Status &status) const noexcept { - if (ok()) { - return status; - } else { - return *this; - } -} - -Status Status::operator&(Status &&s) const noexcept { - if (ok()) { - return std::move(s); - } else { - return *this; - } -} - -Status &Status::operator&=(const Status &s) noexcept { - if (ok() && !s.ok()) { - CopyFrom(s); - } - return *this; -} - -Status &Status::operator&=(Status &&s) noexcept { - if (ok() && !s.ok()) { - MoveFrom(s); - } - return *this; -} - -Status::Status(StatusCode code, const std::string &message) { - state_ = new State; - state_->code = code; - state_->message = message; -} - -void Status::CopyFrom(const Status &status) { - delete state_; - if (status.state_ == nullptr) { - state_ = nullptr; - } else { - state_ = new State(*status.state_); - } -} - -std::string Status::CodeAsString() const { - if (state_ == nullptr) { - return "OK"; - } - - const char *type = nullptr; - switch (code()) { - case StatusCode::OK: type = "OK"; - break; - case StatusCode::Invalid: type = "Invalid"; - break; - case StatusCode::UnknownError: type = "Unknown error"; - break; - case StatusCode::NotSupported: type = "Not Supported"; - break; - default: type = "Unknown"; - break; - } - return std::string(type); -} - -std::string Status::ToString() const { - std::string result(CodeAsString()); - if (state_ == nullptr) { - return result; - } - result += ": "; - result += state_->message; - return result; -} - -} \ No newline at end of file diff --git a/cpp/src/sdk/Status.h b/cpp/src/sdk/Status.h deleted file mode 100644 index 25f9ac8c1d..0000000000 --- a/cpp/src/sdk/Status.h +++ /dev/null @@ -1,325 +0,0 @@ -#pragma once - -#include -#include - -/** \brief MegaSearch SDK namespace - */ -namespace megasearch { - -/** - * @brief Status Code for SDK interface return - */ -enum class StatusCode { - OK = 0, - Invalid = 1, - UnknownError = 2, - NotSupported = 3 -}; - -/** - * @brief Status for SDK interface return - */ -class Status { - public: - /** - * @brief Status - * - * Default constructor. - * - */ - Status() = default; - - /** - * @brief Status - * - * Destructor. - * - */ - ~Status() noexcept; - - /** - * @brief Status - * - * Constructor - * - * @param code, status code. - * @param message, status message. - * - */ - Status(StatusCode code, const std::string &message); - - /** - * @brief Status - * - * Copy constructor - * - * @param status, status to be copied. - * - */ - inline Status(const Status &status); - - /** - * @brief Status - * - * Assignment operator - * - * @param status, status to be copied. - * @return, the status is assigned. - * - */ - inline Status &operator=(const Status &s); - - /** - * @brief Status - * - * Move constructor - * - * @param status, status to be moved. - * - */ - inline Status(Status &&s) noexcept : state_(s.state_) {}; - - /** - * @brief Status - * - * Move assignment operator - * - * @param status, status to be moved. - * @return, the status is moved. - * - */ - inline Status &operator=(Status &&s) noexcept; - - /** - * @brief Status - * - * AND operator - * - * @param status, status to be AND. - * @return, the status after AND operation. - * - */ - inline Status operator&(const Status &s) const noexcept; - - /** - * @brief Status - * - * AND operator - * - * @param status, status to be AND. - * @return, the status after AND operation. - * - */ - inline Status operator&(Status &&s) const noexcept; - - /** - * @brief Status - * - * AND operator - * - * @param status, status to be AND. - * @return, the status after AND operation. - * - */ - inline Status &operator&=(const Status &s) noexcept; - - /** - * @brief Status - * - * AND operator - * - * @param status, status to be AND. - * @return, the status after AND operation. - * - */ - inline Status &operator&=(Status &&s) noexcept; - - /** - * @brief OK - * - * static OK status constructor - * - * @return, the status with OK. - * - */ - static Status OK() { return Status(); } - - /** - * @brief OK - * - * static OK status constructor with a specific message - * - * @param, serveral specific messages - * @return, the status with OK. - * - */ - template - static Status OK(Args &&... args) { - return Status(StatusCode::OK, MessageBuilder(std::forward(args)...)); - } - - /** - * @brief Invalid - * - * static Invalid status constructor with a specific message - * - * @param, serveral specific messages - * @return, the status with Invalid. - * - */ - template - static Status Invalid(Args &&... args) { - return Status(StatusCode::Invalid, - MessageBuilder(std::forward(args)...)); - } - - /** - * @brief Unknown Error - * - * static unknown error status constructor with a specific message - * - * @param, serveral specific messages - * @return, the status with unknown error. - * - */ - template - static Status UnknownError(Args &&... args) { - return Status(StatusCode::UnknownError, MessageBuilder(std::forward(args)...)); - } - - /** - * @brief not supported Error - * - * static not supported status constructor with a specific message - * - * @param, serveral specific messages - * @return, the status with not supported error. - * - */ - template - static Status NotSupported(Args &&... args) { - return Status(StatusCode::NotSupported, MessageBuilder(std::forward(args)...)); - } - - /** - * @brief ok - * - * Return true iff the status indicates success. - * - * @return, if the status indicates success. - * - */ - bool ok() const { return (state_ == nullptr); } - - /** - * @brief IsInvalid - * - * Return true iff the status indicates invalid. - * - * @return, if the status indicates invalid. - * - */ - bool IsInvalid() const { return code() == StatusCode::Invalid; } - - /** - * @brief IsUnknownError - * - * Return true iff the status indicates unknown error. - * - * @return, if the status indicates unknown error. - * - */ - bool IsUnknownError() const { return code() == StatusCode::UnknownError; } - - /** - * @brief IsNotSupported - * - * Return true iff the status indicates not supported. - * - * @return, if the status indicates not supported. - * - */ - bool IsNotSupported() const { return code() == StatusCode::NotSupported; } - - /** - * @brief ToString - * - * Return error message string. - * - * @return, error message string. - * - */ - std::string ToString() const; - - /** - * @brief CodeAsString - * - * Return a string representation of the status code. - * - * @return, a string representation of the status code. - * - */ - std::string CodeAsString() const; - - /** - * @brief code - * - * Return the StatusCode value attached to this status. - * - * @return, the status code value attached to this status. - * - */ - StatusCode code() const { return ok() ? StatusCode::OK : state_->code; } - - /** - * @brief message - * - * Return the specific error message attached to this status. - * - * @return, the specific error message attached to this status. - * - */ - std::string message() const { return ok() ? "" : state_->message; } - - private: - struct State { - StatusCode code; - std::string message; - }; - - // OK status has a `nullptr` state_. Otherwise, `state_` points to - // a `State` structure containing the error code and message. - State *state_ = nullptr; - - void DeleteState() { - delete state_; - state_ = nullptr; - } - - void CopyFrom(const Status &s); - - inline void MoveFrom(Status &s); - - template - static void MessageBuilderRecursive(std::stringstream &stream, Head &&head) { - stream << head; - } - - template - static void MessageBuilderRecursive(std::stringstream &stream, Head &&head, Tail &&... tail) { - MessageBuilderRecursive(stream, std::forward(head)); - MessageBuilderRecursive(stream, std::forward(tail)...); - } - - template - static std::string MessageBuilder(Args &&... args) { - std::stringstream stream; - - MessageBuilderRecursive(stream, std::forward(args)...); - - return stream.str(); - } -}; - -} \ No newline at end of file diff --git a/cpp/src/server/RocksIdMapper.cpp b/cpp/src/server/RocksIdMapper.cpp index 386058f00e..2dba544243 100644 --- a/cpp/src/server/RocksIdMapper.cpp +++ b/cpp/src/server/RocksIdMapper.cpp @@ -18,8 +18,6 @@ namespace zilliz { namespace vecwise { namespace server { -static const std::string ROCKSDB_DEFAULT_GROUP = "default"; - RocksIdMapper::RocksIdMapper() : db_(nullptr) { OpenDb(); @@ -30,8 +28,6 @@ RocksIdMapper::~RocksIdMapper() { } void RocksIdMapper::OpenDb() { - std::lock_guard lck(db_mutex_); - if(db_) { return; } @@ -83,8 +79,6 @@ void RocksIdMapper::OpenDb() { } void RocksIdMapper::CloseDb() { - std::lock_guard lck(db_mutex_); - for(auto& iter : column_handles_) { delete iter.second; } @@ -96,117 +90,7 @@ void RocksIdMapper::CloseDb() { } } -ServerError RocksIdMapper::AddGroup(const std::string& group) { - std::lock_guard lck(db_mutex_); - - return AddGroupInternal(group); -} - -bool RocksIdMapper::IsGroupExist(const std::string& group) const { - std::lock_guard lck(db_mutex_); - - return IsGroupExistInternal(group); -} - - ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) { - std::lock_guard lck(db_mutex_); - - return PutInternal(nid, sid, group); -} - -ServerError RocksIdMapper::Put(const std::vector& nid, const std::vector& sid, const std::string& group) { - if(nid.size() != sid.size()) { - return SERVER_INVALID_ARGUMENT; - } - - std::lock_guard lck(db_mutex_); - ServerError err = SERVER_SUCCESS; - for(size_t i = 0; i < nid.size(); i++) { - err = PutInternal(nid[i], sid[i], group); - if(err != SERVER_SUCCESS) { - return err; - } - } - - return err; -} - -ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const { - std::lock_guard lck(db_mutex_); - - return GetInternal(nid, sid, group); -} - -ServerError RocksIdMapper::Get(const std::vector& nid, std::vector& sid, const std::string& group) const { - sid.clear(); - - std::lock_guard lck(db_mutex_); - - ServerError err = SERVER_SUCCESS; - for(size_t i = 0; i < nid.size(); i++) { - std::string str_id; - ServerError temp_err = GetInternal(nid[i], str_id, group); - if(temp_err != SERVER_SUCCESS) { - sid.push_back(""); - SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i]; - err = temp_err; - continue; - } - - sid.push_back(str_id); - } - - return err; -} - -ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) { - std::lock_guard lck(db_mutex_); - - return DeleteInternal(nid, group); -} - -ServerError RocksIdMapper::DeleteGroup(const std::string& group) { - std::lock_guard lck(db_mutex_); - - return DeleteGroupInternal(group); -} - -//internal methods(whitout lock) -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -ServerError RocksIdMapper::AddGroupInternal(const std::string& group) { - if(!IsGroupExistInternal(group)) { - if(db_ == nullptr) { - return SERVER_NULL_POINTER; - } - - try {//add group - rocksdb::ColumnFamilyHandle *cfh = nullptr; - rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh); - if (!s.ok()) { - SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString(); - return SERVER_UNEXPECTED_ERROR; - } else { - column_handles_.insert(std::make_pair(group, cfh)); - } - } catch(std::exception& ex) { - SERVER_LOG_ERROR << "ID mapper failed to create group: " << ex.what(); - return SERVER_UNEXPECTED_ERROR; - } - } - - return SERVER_SUCCESS; -} - -bool RocksIdMapper::IsGroupExistInternal(const std::string& group) const { - std::string group_name = group; - if(group_name.empty()){ - group_name = ROCKSDB_DEFAULT_GROUP; - } - return (column_handles_.count(group_name) > 0 && column_handles_[group_name] != nullptr); -} - -ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string& sid, const std::string& group) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -220,12 +104,22 @@ ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string return SERVER_UNEXPECTED_ERROR; } } else { - //try create group - if(AddGroupInternal(group) != SERVER_SUCCESS){ - return SERVER_UNEXPECTED_ERROR; + rocksdb::ColumnFamilyHandle *cfh = nullptr; + if(column_handles_.count(group) == 0) { + try {//add group + rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh); + if (!s.ok()) { + SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString(); + } else { + column_handles_.insert(std::make_pair(group, cfh)); + } + } catch(std::exception& ex) { + std::cout << ex.what() << std::endl; + } + } else { + cfh = column_handles_[group]; } - rocksdb::ColumnFamilyHandle *cfh = column_handles_[group]; rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), cfh, key, value); if (!s.ok()) { SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString(); @@ -236,7 +130,23 @@ ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string return SERVER_SUCCESS; } -ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid, const std::string& group) const { +ServerError RocksIdMapper::Put(const std::vector& nid, const std::vector& sid, const std::string& group) { + if(nid.size() != sid.size()) { + return SERVER_INVALID_ARGUMENT; + } + + ServerError err = SERVER_SUCCESS; + for(size_t i = 0; i < nid.size(); i++) { + err = Put(nid[i], sid[i], group); + if(err != SERVER_SUCCESS) { + return err; + } + } + + return err; +} + +ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const { sid = ""; if(db_ == nullptr) { return SERVER_NULL_POINTER; @@ -263,8 +173,28 @@ ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid, return SERVER_SUCCESS; } -ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::string& group) { - if(db_ == nullptr) { +ServerError RocksIdMapper::Get(const std::vector& nid, std::vector& sid, const std::string& group) const { + sid.clear(); + + ServerError err = SERVER_SUCCESS; + for(size_t i = 0; i < nid.size(); i++) { + std::string str_id; + ServerError temp_err = Get(nid[i], str_id, group); + if(temp_err != SERVER_SUCCESS) { + sid.push_back(""); + SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i]; + err = temp_err; + continue; + } + + sid.push_back(str_id); + } + + return err; +} + +ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) { + if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -288,7 +218,7 @@ ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::str return SERVER_SUCCESS; } -ServerError RocksIdMapper::DeleteGroupInternal(const std::string& group) { +ServerError RocksIdMapper::DeleteGroup(const std::string& group) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } diff --git a/cpp/src/server/RocksIdMapper.h b/cpp/src/server/RocksIdMapper.h index 5fc4667e75..8c73155903 100644 --- a/cpp/src/server/RocksIdMapper.h +++ b/cpp/src/server/RocksIdMapper.h @@ -13,7 +13,6 @@ #include #include #include -#include namespace zilliz { namespace vecwise { @@ -24,9 +23,6 @@ public: RocksIdMapper(); ~RocksIdMapper(); - ServerError AddGroup(const std::string& group) override; - bool IsGroupExist(const std::string& group) const override; - ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override; ServerError Put(const std::vector& nid, const std::vector& sid, const std::string& group = "") override; @@ -40,22 +36,9 @@ private: void OpenDb(); void CloseDb(); - ServerError AddGroupInternal(const std::string& group); - - bool IsGroupExistInternal(const std::string& group) const; - - ServerError PutInternal(const std::string& nid, const std::string& sid, const std::string& group); - - ServerError GetInternal(const std::string& nid, std::string& sid, const std::string& group) const; - - ServerError DeleteInternal(const std::string& nid, const std::string& group); - - ServerError DeleteGroupInternal(const std::string& group); - private: rocksdb::DB* db_; - mutable std::unordered_map column_handles_; - mutable std::mutex db_mutex_; + std::unordered_map column_handles_; }; } diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 3f51929baf..eb05d74a41 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -19,6 +19,8 @@ #include #include +#include "metrics/Metrics.h" + namespace zilliz { namespace vecwise { namespace server { @@ -133,6 +135,10 @@ Server::Daemonize() { int Server::Start() { +// server::Metrics::GetInstance().Init(); +// server::Metrics::GetInstance().exposer_ptr()->RegisterCollectable(server::Metrics::GetInstance().registry_ptr()); + METRICS_INSTANCE.Init(); + if (daemonized_) { Daemonize(); } @@ -160,8 +166,10 @@ Server::Start() { exit(1); } - std::thread counting_down(&server::LicenseCheck::StartCountingDown, license_file_path); - counting_down.detach(); + if(server::LicenseCheck::StartCountingDown(license_file_path) != SERVER_SUCCESS) { + SERVER_LOG_ERROR << "License counter start error"; + exit(1); + } #endif // Handle Signal diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index d840077e15..a630442c98 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -35,6 +35,10 @@ static const std::string CONFIG_GPU_CACHE_CAPACITY = "gpu_cache_capacity"; static const std::string CONFIG_LICENSE = "license_config"; static const std::string CONFIG_LICENSE_PATH = "license_path"; +static const std::string CONFIG_METRIC = "metric_config"; +static const std::string CONFIG_METRIC_IS_STARTUP = "is_startup"; +static const std::string CONFIG_METRIC_COLLECTOR = "collector"; + class ServerConfig { public: static ServerConfig &GetInstance(); diff --git a/cpp/src/server/VecIdMapper.cpp b/cpp/src/server/VecIdMapper.cpp index b8bea7b348..ecf5058a4b 100644 --- a/cpp/src/server/VecIdMapper.cpp +++ b/cpp/src/server/VecIdMapper.cpp @@ -39,17 +39,6 @@ SimpleIdMapper::~SimpleIdMapper() { } -ServerError SimpleIdMapper::AddGroup(const std::string& group) { - if(id_groups_.count(group) == 0) { - id_groups_.insert(std::make_pair(group, ID_MAPPING())); - } -} - -//not thread-safe -bool SimpleIdMapper::IsGroupExist(const std::string& group) const { - return id_groups_.count(group) > 0; -} - //not thread-safe ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) { ID_MAPPING& mapping = id_groups_[group]; diff --git a/cpp/src/server/VecIdMapper.h b/cpp/src/server/VecIdMapper.h index f3c2bdde27..9bb6d500da 100644 --- a/cpp/src/server/VecIdMapper.h +++ b/cpp/src/server/VecIdMapper.h @@ -25,9 +25,6 @@ public: virtual ~IVecIdMapper(){} - virtual ServerError AddGroup(const std::string& group) = 0; - virtual bool IsGroupExist(const std::string& group) const = 0; - virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0; virtual ServerError Put(const std::vector& nid, const std::vector& sid, const std::string& group = "") = 0; @@ -44,9 +41,6 @@ public: SimpleIdMapper(); ~SimpleIdMapper(); - ServerError AddGroup(const std::string& group) override; - bool IsGroupExist(const std::string& group) const override; - ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override; ServerError Put(const std::vector& nid, const std::vector& sid, const std::string& group = "") override; diff --git a/cpp/src/server/VecServiceTask.cpp b/cpp/src/server/VecServiceTask.cpp index c2435ff51b..9ebe2c1bdf 100644 --- a/cpp/src/server/VecServiceTask.cpp +++ b/cpp/src/server/VecServiceTask.cpp @@ -87,7 +87,6 @@ BaseTaskPtr AddGroupTask::Create(int32_t dimension, ServerError AddGroupTask::OnExecute() { try { - IVecIdMapper::GetInstance()->AddGroup(group_id_); engine::meta::GroupSchema group_info; group_info.dimension = (size_t)dimension_; group_info.group_id = group_id_; @@ -244,13 +243,6 @@ const AttribMap& AddVectorTask::GetVecAttrib() const { ServerError AddVectorTask::OnExecute() { try { - if(!IVecIdMapper::GetInstance()->IsGroupExist(group_id_)) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = "group not exist"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; - } - uint64_t vec_dim = GetVecDimension(); std::vector vec_f; vec_f.resize(vec_dim); @@ -488,14 +480,10 @@ ServerError AddBatchVectorTask::OnExecute() { std::list> threads_list; uint64_t begin_index = 0, end_index = USE_MT; - while(true) { + while(end_index < vec_count) { threads_list.push_back( GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping, this, vector_ids, begin_index, end_index, tensor_ids_)); - if(end_index >= vec_count) { - break; - } - begin_index = end_index; end_index += USE_MT; if(end_index > vec_count) {