From a68fa43ddd238af805a767b2d233acd30ff548e3 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Fri, 9 Aug 2019 15:46:32 +0800 Subject: [PATCH 1/3] grpc and thrift server run concurrently Former-commit-id: 4abfccaf6ee598bbb7cc47d48f943b8ca9726f12 --- cpp/cmake/DefineOptions.cmake | 2 +- cpp/cmake/ThirdPartyPackages.cmake | 4 +- cpp/scripts/start_server.sh | 2 +- cpp/src/CMakeLists.txt | 51 ++++++---- cpp/src/server/Server.cpp | 11 ++- ...{MilvusServer.cpp => GrpcMilvusServer.cpp} | 12 +-- .../{MilvusServer.h => GrpcMilvusServer.h} | 2 +- ...uestHandler.cpp => GrpcRequestHandler.cpp} | 92 +++++++++---------- ...{RequestHandler.h => GrpcRequestHandler.h} | 2 +- ...Scheduler.cpp => GrpcRequestScheduler.cpp} | 28 +++--- ...uestScheduler.h => GrpcRequestScheduler.h} | 18 ++-- .../{RequestTask.cpp => GrpcRequestTask.cpp} | 44 ++++----- .../{RequestTask.h => GrpcRequestTask.h} | 22 ++--- cpp/src/server/thrift_impl/RequestHandler.cpp | 30 +++--- 14 files changed, 166 insertions(+), 154 deletions(-) rename cpp/src/server/grpc_impl/{MilvusServer.cpp => GrpcMilvusServer.cpp} (92%) rename cpp/src/server/grpc_impl/{MilvusServer.h => GrpcMilvusServer.h} (95%) rename cpp/src/server/grpc_impl/{RequestHandler.cpp => GrpcRequestHandler.cpp} (54%) rename cpp/src/server/grpc_impl/{RequestHandler.h => GrpcRequestHandler.h} (98%) rename cpp/src/server/grpc_impl/{RequestScheduler.cpp => GrpcRequestScheduler.cpp} (90%) rename cpp/src/server/grpc_impl/{RequestScheduler.h => GrpcRequestScheduler.h} (83%) rename cpp/src/server/grpc_impl/{RequestTask.cpp => GrpcRequestTask.cpp} (95%) rename cpp/src/server/grpc_impl/{RequestTask.h => GrpcRequestTask.h} (91%) diff --git a/cpp/cmake/DefineOptions.cmake b/cpp/cmake/DefineOptions.cmake index d0624bd0fd..af89dccb4b 100644 --- a/cpp/cmake/DefineOptions.cmake +++ b/cpp/cmake/DefineOptions.cmake @@ -90,7 +90,7 @@ define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON) define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON) -define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" OFF) +define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON) define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON) diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index 6846275edf..ade4b4a837 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -2640,7 +2640,7 @@ macro(build_grpc) add_dependencies(grpc_protoc grpc_ep) endmacro() -if(NOT MILVUS_WITH_THRIFT STREQUAL "ON") +#if(NOT MILVUS_WITH_THRIFT STREQUAL "ON") resolve_dependency(GRPC) get_target_property(GRPC_INCLUDE_DIR grpc INTERFACE_INCLUDE_DIRECTORIES) @@ -2651,4 +2651,4 @@ if(NOT MILVUS_WITH_THRIFT STREQUAL "ON") include_directories(SYSTEM ${GRPC_THIRD_PARTY_DIR}/protobuf/src) link_directories(SYSTEM ${GRPC_PROTOBUF_LIB_DIR}) -endif() +#endif() diff --git a/cpp/scripts/start_server.sh b/cpp/scripts/start_server.sh index 312cef86d6..72e120f2ce 100755 --- a/cpp/scripts/start_server.sh +++ b/cpp/scripts/start_server.sh @@ -1,4 +1,4 @@ #!/bin/bash -../bin/milvus_server -c ../conf/server_config.yaml -l ../conf/log_config.conf +../bin/milvus_grpc_server -c ../conf/server_config.yaml -l ../conf/log_config.conf diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 50e09e8c3f..b84dde009c 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -4,7 +4,6 @@ # Proprietary and confidential. #------------------------------------------------------------------------------- - aux_source_directory(cache cache_files) aux_source_directory(config config_files) aux_source_directory(server server_files) @@ -78,17 +77,17 @@ include_directories(/usr/include/mysql) include_directories(grpc/gen-status) include_directories(grpc/gen-milvus) -if (MILVUS_WITH_THRIFT STREQUAL "ON") - set(client_lib +#if (MILVUS_WITH_THRIFT STREQUAL "ON") + set(client_thrift_lib thrift) -else() - set(client_lib +#else() + set(client_grpc_lib grpcpp_channelz grpc++ grpc grpc_protobuf grpc_protoc) -endif() +#endif() set(third_party_libs knowhere @@ -100,7 +99,8 @@ set(third_party_libs lapack easyloggingpp sqlite - ${client_lib} + ${client_thrift_lib} + ${client_grpc_lib} yaml-cpp prometheus-cpp-push prometheus-cpp-pull @@ -197,7 +197,7 @@ set(knowhere_libs tbb ) -if (MILVUS_WITH_THRIFT STREQUAL "ON") +#if (MILVUS_WITH_THRIFT STREQUAL "ON") add_executable(milvus_thrift_server ${config_files} ${server_files} @@ -206,7 +206,7 @@ if (MILVUS_WITH_THRIFT STREQUAL "ON") ${thrift_service_files} ${metrics_files} ) -else() +#else() add_executable(milvus_grpc_server ${config_files} ${server_files} @@ -215,7 +215,15 @@ else() ${grpc_service_files} ${metrics_files} ) -endif() +#endif() + add_executable(milvus_server + ${config_files} + ${server_files} + ${thriftserver_files} + ${grpcserver_files} + ${utils_files} + ${thrift_service_files} + ${metrics_files}) if (ENABLE_LICENSE STREQUAL "ON") add_executable(get_sys_info ${get_sys_info_files}) @@ -224,25 +232,28 @@ if (ENABLE_LICENSE STREQUAL "ON") target_link_libraries(get_sys_info ${license_libs} license_check ${third_party_libs}) target_link_libraries(license_generator ${license_libs} ${third_party_libs}) - if(MILVUS_WITH_THRIFT STREQUAL "ON") +# if(MILVUS_WITH_THRIFT STREQUAL "ON") target_link_libraries(milvus_thrift_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) - else() +# else() target_link_libraries(milvus_grpc_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) - endif() +# endif() + target_link_libraries(milvus_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) + else () - if(MILVUS_WITH_THRIFT STREQUAL "ON") +# if(MILVUS_WITH_THRIFT STREQUAL "ON") target_link_libraries(milvus_thrift_server ${server_libs} ${knowhere_libs} ${third_party_libs}) - else() +# else() target_link_libraries(milvus_grpc_server ${server_libs} ${knowhere_libs} ${third_party_libs}) - endif() - +# endif() + target_link_libraries(milvus_server ${server_libs} ${knowhere_libs} ${third_party_libs}) endif() -if (MILVUS_WITH_THRIFT STREQUAL "ON") +#if (MILVUS_WITH_THRIFT STREQUAL "ON") install(TARGETS milvus_thrift_server DESTINATION bin) -else() +#else() install(TARGETS milvus_grpc_server DESTINATION bin) -endif() +#endif() + install(TARGETS milvus_server DESTINATION bin) install(FILES ${KNOWHERE_BUILD_DIR}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}tbb${CMAKE_SHARED_LIBRARY_SUFFIX} diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 4bfe1f76ab..13218faae2 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -5,11 +5,11 @@ //////////////////////////////////////////////////////////////////////////////// #include "Server.h" //#include "ServerConfig.h" -#ifdef MILVUS_ENABLE_THRIFT +//#ifdef MILVUS_ENABLE_THRIFT #include "server/thrift_impl/MilvusServer.h" -#else -#include "server/grpc_impl/MilvusServer.h" -#endif +//#else +#include "server/grpc_impl/GrpcMilvusServer.h" +//#endif #include "utils/Log.h" #include "utils/SignalUtil.h" @@ -225,11 +225,12 @@ Server::LoadConfig() { void Server::StartService() { MilvusServer::StartService(); + GrpcMilvusServer::StartService(); } void Server::StopService() { - MilvusServer::StopService(); + GrpcMilvusServer::StopService(); } } diff --git a/cpp/src/server/grpc_impl/MilvusServer.cpp b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp similarity index 92% rename from cpp/src/server/grpc_impl/MilvusServer.cpp rename to cpp/src/server/grpc_impl/GrpcMilvusServer.cpp index bd8cfd7f09..98631a5dda 100644 --- a/cpp/src/server/grpc_impl/MilvusServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp @@ -4,12 +4,12 @@ * Proprietary and confidential. ******************************************************************************/ #include "milvus.grpc.pb.h" -#include "MilvusServer.h" +#include "GrpcMilvusServer.h" #include "../ServerConfig.h" #include "../DBWrapper.h" #include "utils/Log.h" #include "faiss/utils.h" -#include "RequestHandler.h" +#include "GrpcRequestHandler.h" #include #include @@ -34,7 +34,7 @@ static std::unique_ptr server; constexpr long MESSAGE_SIZE = -1; void -MilvusServer::StartService() { +GrpcMilvusServer::StartService() { if (server != nullptr){ std::cout << "stopservice!\n"; StopService(); @@ -44,7 +44,7 @@ MilvusServer::StartService() { ConfigNode server_config = config.GetConfig(CONFIG_SERVER); ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE); std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1"); - int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19530); + int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19531); faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20); @@ -60,7 +60,7 @@ MilvusServer::StartService() { builder.SetDefaultCompressionAlgorithm(GRPC_COMPRESS_STREAM_GZIP); builder.SetDefaultCompressionLevel(GRPC_COMPRESS_LEVEL_HIGH); - RequestHandler service; + GrpcRequestHandler service; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); @@ -71,7 +71,7 @@ MilvusServer::StartService() { } void -MilvusServer::StopService() { +GrpcMilvusServer::StopService() { if (server != nullptr) { server->Shutdown(); } diff --git a/cpp/src/server/grpc_impl/MilvusServer.h b/cpp/src/server/grpc_impl/GrpcMilvusServer.h similarity index 95% rename from cpp/src/server/grpc_impl/MilvusServer.h rename to cpp/src/server/grpc_impl/GrpcMilvusServer.h index 82dcd64ce5..4a3eec689f 100644 --- a/cpp/src/server/grpc_impl/MilvusServer.h +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.h @@ -11,7 +11,7 @@ namespace zilliz { namespace milvus { namespace server { -class MilvusServer { +class GrpcMilvusServer { public: static void StartService(); diff --git a/cpp/src/server/grpc_impl/RequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp similarity index 54% rename from cpp/src/server/grpc_impl/RequestHandler.cpp rename to cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index 7a582f91eb..3d9fd01398 100644 --- a/cpp/src/server/grpc_impl/RequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -4,8 +4,8 @@ * Proprietary and confidential. ******************************************************************************/ -#include "RequestHandler.h" -#include "RequestTask.h" +#include "GrpcRequestHandler.h" +#include "GrpcRequestTask.h" #include "utils/TimeRecorder.h" namespace zilliz { @@ -13,24 +13,24 @@ namespace milvus { namespace server { ::grpc::Status -RequestHandler::CreateTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableSchema *request, - ::milvus::grpc::Status *response) { +GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableSchema *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = CreateTableTask::Create(*request); - RequestScheduler::ExecTask(task_ptr, response); + GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::HasTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::BoolReply *response) { +GrpcRequestHandler::HasTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::BoolReply *response) { bool has_table = false; BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->set_bool_reply(has_table); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); @@ -38,47 +38,47 @@ RequestHandler::HasTable(::grpc::ServerContext *context, } ::grpc::Status -RequestHandler::DropTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::Status* response) { +GrpcRequestHandler::DropTable(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::Status* response) { BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name()); - RequestScheduler::ExecTask(task_ptr, response); + GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::BuildIndex(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::Status* response) { +GrpcRequestHandler::BuildIndex(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::Status* response) { BaseTaskPtr task_ptr = BuildIndexTask::Create(request->table_name()); - RequestScheduler::ExecTask(task_ptr, response); + GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::InsertVector(::grpc::ServerContext* context, - const ::milvus::grpc::InsertInfos* request, - ::milvus::grpc::VectorIds* response) { +GrpcRequestHandler::InsertVector(::grpc::ServerContext* context, + const ::milvus::grpc::InsertInfos* request, + ::milvus::grpc::VectorIds* response) { BaseTaskPtr task_ptr = InsertVectorTask::Create(*request, *response); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::SearchVector(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInfos* request, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { +GrpcRequestHandler::SearchVector(::grpc::ServerContext* context, + const ::milvus::grpc::SearchVectorInfos* request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { std::vector file_id_array; BaseTaskPtr task_ptr = SearchVectorTask::Create(*request, file_id_array, *writer); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); if (grpc_status.error_code() != SERVER_SUCCESS) { ::grpc::Status status(::grpc::INVALID_ARGUMENT, grpc_status.reason()); return status; @@ -88,14 +88,14 @@ RequestHandler::SearchVector(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInFilesInfos* request, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { +GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, + const ::milvus::grpc::SearchVectorInFilesInfos* request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { std::vector file_id_array; BaseTaskPtr task_ptr = SearchVectorTask::Create(request->search_vector_infos(), file_id_array, *writer); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); if (grpc_status.error_code() != SERVER_SUCCESS) { ::grpc::Status status(::grpc::INVALID_ARGUMENT, grpc_status.reason()); return status; @@ -105,27 +105,27 @@ RequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::DescribeTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::TableSchema* response) { +GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::TableSchema* response) { BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), *response); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->mutable_table_name()->mutable_status()->set_error_code(grpc_status.error_code()); response->mutable_table_name()->mutable_status()->set_reason(grpc_status.reason()); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::GetTableRowCount(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::TableRowCount* response) { +GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::TableRowCount* response) { int64_t row_count = 0; BaseTaskPtr task_ptr = GetTableRowCountTask::Create(request->table_name(), row_count); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->set_table_row_count(row_count); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); @@ -133,13 +133,13 @@ RequestHandler::GetTableRowCount(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::ShowTables(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, - ::grpc::ServerWriter<::milvus::grpc::TableName>* writer) { +GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, + const ::milvus::grpc::Command* request, + ::grpc::ServerWriter<::milvus::grpc::TableName>* writer) { BaseTaskPtr task_ptr = ShowTablesTask::Create(*writer); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); if (grpc_status.error_code() != SERVER_SUCCESS) { ::grpc::Status status(::grpc::UNKNOWN, grpc_status.reason()); return status; @@ -149,14 +149,14 @@ RequestHandler::ShowTables(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::Ping(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, - ::milvus::grpc::ServerStatus* response) { +GrpcRequestHandler::Ping(::grpc::ServerContext* context, + const ::milvus::grpc::Command* request, + ::milvus::grpc::ServerStatus* response) { std::string result; BaseTaskPtr task_ptr = PingTask::Create(request->cmd(), result); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->set_info(result); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); diff --git a/cpp/src/server/grpc_impl/RequestHandler.h b/cpp/src/server/grpc_impl/GrpcRequestHandler.h similarity index 98% rename from cpp/src/server/grpc_impl/RequestHandler.h rename to cpp/src/server/grpc_impl/GrpcRequestHandler.h index f098d19b97..34c8f7493d 100644 --- a/cpp/src/server/grpc_impl/RequestHandler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.h @@ -14,7 +14,7 @@ namespace zilliz { namespace milvus { namespace server { -class RequestHandler final : public ::milvus::grpc::MilvusService::Service { +class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service { public: /** * @brief Create table method diff --git a/cpp/src/server/grpc_impl/RequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp similarity index 90% rename from cpp/src/server/grpc_impl/RequestScheduler.cpp rename to cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index 5358906cd3..43b3d6d791 100644 --- a/cpp/src/server/grpc_impl/RequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -3,7 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#include "RequestScheduler.h" +#include "GrpcRequestScheduler.h" #include "utils/Log.h" #include "src/grpc/gen-status/status.pb.h" @@ -50,7 +50,7 @@ namespace { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -BaseTask::BaseTask(const std::string& task_group, bool async) +GrpcBaseTask::GrpcBaseTask(const std::string& task_group, bool async) : task_group_(task_group), async_(async), done_(false), @@ -58,12 +58,12 @@ BaseTask::BaseTask(const std::string& task_group, bool async) } -BaseTask::~BaseTask() { +GrpcBaseTask::~GrpcBaseTask() { WaitToFinish(); } ServerError -BaseTask::Execute() { +GrpcBaseTask::Execute() { error_code_ = OnExecute(); done_ = true; finish_cond_.notify_all(); @@ -71,7 +71,7 @@ BaseTask::Execute() { } ServerError -BaseTask::SetError(ServerError error_code, const std::string& error_msg) { +GrpcBaseTask::SetError(ServerError error_code, const std::string& error_msg) { error_code_ = error_code; error_msg_ = error_msg; @@ -80,7 +80,7 @@ BaseTask::SetError(ServerError error_code, const std::string& error_msg) { } ServerError -BaseTask::WaitToFinish() { +GrpcBaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); finish_cond_.wait(lock, [this] { return done_; }); @@ -88,22 +88,22 @@ BaseTask::WaitToFinish() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -RequestScheduler::RequestScheduler() +GrpcRequestScheduler::GrpcRequestScheduler() : stopped_(false) { Start(); } -RequestScheduler::~RequestScheduler() { +GrpcRequestScheduler::~GrpcRequestScheduler() { Stop(); } void -RequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_status) { +GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_status) { if(task_ptr == nullptr) { return; } - RequestScheduler& scheduler = RequestScheduler::GetInstance(); + GrpcRequestScheduler& scheduler = GrpcRequestScheduler::GetInstance(); scheduler.ExecuteTask(task_ptr); if(!task_ptr->IsAsync()) { @@ -117,7 +117,7 @@ RequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_s } void -RequestScheduler::Start() { +GrpcRequestScheduler::Start() { if(!stopped_) { return; } @@ -126,7 +126,7 @@ RequestScheduler::Start() { } void -RequestScheduler::Stop() { +GrpcRequestScheduler::Stop() { if(stopped_) { return; } @@ -152,7 +152,7 @@ RequestScheduler::Stop() { } ServerError -RequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) { +GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) { if(task_ptr == nullptr) { return SERVER_NULL_POINTER; } @@ -196,7 +196,7 @@ namespace { } ServerError -RequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) { +GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); diff --git a/cpp/src/server/grpc_impl/RequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h similarity index 83% rename from cpp/src/server/grpc_impl/RequestScheduler.h rename to cpp/src/server/grpc_impl/GrpcRequestScheduler.h index 4b8d0da089..ca878980dd 100644 --- a/cpp/src/server/grpc_impl/RequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -17,10 +17,10 @@ namespace zilliz { namespace milvus { namespace server { -class BaseTask { +class GrpcBaseTask { protected: - BaseTask(const std::string& task_group, bool async = false); - virtual ~BaseTask(); + GrpcBaseTask(const std::string& task_group, bool async = false); + virtual ~GrpcBaseTask(); public: ServerError @@ -59,15 +59,15 @@ protected: std::string error_msg_; }; -using BaseTaskPtr = std::shared_ptr; +using BaseTaskPtr = std::shared_ptr; using TaskQueue = BlockingQueue; using TaskQueuePtr = std::shared_ptr; using ThreadPtr = std::shared_ptr; -class RequestScheduler { +class GrpcRequestScheduler { public: - static RequestScheduler& GetInstance() { - static RequestScheduler scheduler; + static GrpcRequestScheduler& GetInstance() { + static GrpcRequestScheduler scheduler; return scheduler; } @@ -81,8 +81,8 @@ public: ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status); protected: - RequestScheduler(); - virtual ~RequestScheduler(); + GrpcRequestScheduler(); + virtual ~GrpcRequestScheduler(); ServerError PutTaskToQueue(const BaseTaskPtr& task_ptr); diff --git a/cpp/src/server/grpc_impl/RequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp similarity index 95% rename from cpp/src/server/grpc_impl/RequestTask.cpp rename to cpp/src/server/grpc_impl/GrpcRequestTask.cpp index c27daaa966..975bf4a82b 100644 --- a/cpp/src/server/grpc_impl/RequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -3,7 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#include "RequestTask.h" +#include "GrpcRequestTask.h" #include "../ServerConfig.h" #include "utils/CommonUtil.h" #include "utils/Log.h" @@ -11,7 +11,7 @@ #include "utils/ValidationUtil.h" #include "../DBWrapper.h" #include "version.h" -#include "MilvusServer.h" +#include "GrpcMilvusServer.h" #include "src/server/Server.h" @@ -100,7 +100,7 @@ namespace { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema& schema) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), schema_(schema) { } @@ -109,7 +109,7 @@ BaseTaskPtr CreateTableTask::Create(const ::milvus::grpc::TableSchema& schema) { // BaseTaskPtr create_table_task_ptr = std::make_shared(schema); // return create_table_task_ptr; - return std::shared_ptr(new CreateTableTask(schema)); + return std::shared_ptr(new CreateTableTask(schema)); } ServerError @@ -158,14 +158,14 @@ CreateTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema& schema) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), schema_(schema) { } BaseTaskPtr DescribeTableTask::Create(const std::string& table_name, ::milvus::grpc::TableSchema& schema) { - return std::shared_ptr(new DescribeTableTask(table_name, schema)); + return std::shared_ptr(new DescribeTableTask(table_name, schema)); } ServerError @@ -204,13 +204,13 @@ DescribeTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// BuildIndexTask::BuildIndexTask(const std::string& table_name) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { } BaseTaskPtr BuildIndexTask::Create(const std::string& table_name) { - return std::shared_ptr(new BuildIndexTask(table_name)); + return std::shared_ptr(new BuildIndexTask(table_name)); } ServerError @@ -250,7 +250,7 @@ BuildIndexTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// HasTableTask::HasTableTask(const std::string& table_name, bool& has_table) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), has_table_(has_table) { @@ -258,7 +258,7 @@ HasTableTask::HasTableTask(const std::string& table_name, bool& has_table) BaseTaskPtr HasTableTask::Create(const std::string& table_name, bool& has_table) { - return std::shared_ptr(new HasTableTask(table_name, has_table)); + return std::shared_ptr(new HasTableTask(table_name, has_table)); } ServerError @@ -288,14 +288,14 @@ HasTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DropTableTask::DropTableTask(const std::string& table_name) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { } BaseTaskPtr DropTableTask::Create(const std::string& table_name) { - return std::shared_ptr(new DropTableTask(table_name)); + return std::shared_ptr(new DropTableTask(table_name)); } ServerError @@ -340,14 +340,14 @@ DropTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ShowTablesTask::ShowTablesTask(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), writer_(writer) { } BaseTaskPtr ShowTablesTask::Create(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer) { - return std::shared_ptr(new ShowTablesTask(writer)); + return std::shared_ptr(new ShowTablesTask(writer)); } ServerError @@ -371,7 +371,7 @@ ShowTablesTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// InsertVectorTask::InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_infos, ::milvus::grpc::VectorIds& record_ids) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), insert_infos_(insert_infos), record_ids_(record_ids) { record_ids_.Clear(); @@ -380,7 +380,7 @@ InsertVectorTask::InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_inf BaseTaskPtr InsertVectorTask::Create(const ::milvus::grpc::InsertInfos& insert_infos, ::milvus::grpc::VectorIds& record_ids) { - return std::shared_ptr(new InsertVectorTask(insert_infos, record_ids)); + return std::shared_ptr(new InsertVectorTask(insert_infos, record_ids)); } ServerError @@ -477,7 +477,7 @@ InsertVectorTask::OnExecute() { SearchVectorTask::SearchVectorTask(const ::milvus::grpc::SearchVectorInfos& search_vector_infos, const std::vector& file_id_array, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer) - : BaseTask(DQL_TASK_GROUP), + : GrpcBaseTask(DQL_TASK_GROUP), search_vector_infos_(search_vector_infos), file_id_array_(file_id_array), writer_(writer) { @@ -488,7 +488,7 @@ BaseTaskPtr SearchVectorTask::Create(const ::milvus::grpc::SearchVectorInfos& search_vector_infos, const std::vector& file_id_array, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer) { - return std::shared_ptr(new SearchVectorTask(search_vector_infos, file_id_array, + return std::shared_ptr(new SearchVectorTask(search_vector_infos, file_id_array, writer)); } @@ -630,7 +630,7 @@ SearchVectorTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), row_count_(row_count) { @@ -638,7 +638,7 @@ GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_ BaseTaskPtr GetTableRowCountTask::Create(const std::string& table_name, int64_t& row_count) { - return std::shared_ptr(new GetTableRowCountTask(table_name, row_count)); + return std::shared_ptr(new GetTableRowCountTask(table_name, row_count)); } ServerError @@ -673,7 +673,7 @@ GetTableRowCountTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// PingTask::PingTask(const std::string& cmd, std::string& result) - : BaseTask(PING_TASK_GROUP), + : GrpcBaseTask(PING_TASK_GROUP), cmd_(cmd), result_(result) { @@ -681,7 +681,7 @@ PingTask::PingTask(const std::string& cmd, std::string& result) BaseTaskPtr PingTask::Create(const std::string& cmd, std::string& result) { - return std::shared_ptr(new PingTask(cmd, result)); + return std::shared_ptr(new PingTask(cmd, result)); } ServerError diff --git a/cpp/src/server/grpc_impl/RequestTask.h b/cpp/src/server/grpc_impl/GrpcRequestTask.h similarity index 91% rename from cpp/src/server/grpc_impl/RequestTask.h rename to cpp/src/server/grpc_impl/GrpcRequestTask.h index 1bca8e0a68..cb75bf7ece 100644 --- a/cpp/src/server/grpc_impl/RequestTask.h +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.h @@ -4,7 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ #pragma once -#include "RequestScheduler.h" +#include "GrpcRequestScheduler.h" #include "utils/Error.h" #include "db/Types.h" @@ -19,7 +19,7 @@ namespace milvus { namespace server { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class CreateTableTask : public BaseTask { +class CreateTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const ::milvus::grpc::TableSchema& schema); @@ -36,7 +36,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class HasTableTask : public BaseTask { +class HasTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name, bool& has_table); @@ -54,7 +54,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class DescribeTableTask : public BaseTask { +class DescribeTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name, ::milvus::grpc::TableSchema& schema); @@ -72,7 +72,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class DropTableTask : public BaseTask { +class DropTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name); @@ -90,7 +90,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class BuildIndexTask : public BaseTask { +class BuildIndexTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name); @@ -108,7 +108,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class ShowTablesTask : public BaseTask { +class ShowTablesTask : public GrpcBaseTask { public: static BaseTaskPtr Create(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer); @@ -125,7 +125,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class InsertVectorTask : public BaseTask { +class InsertVectorTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const ::milvus::grpc::InsertInfos& insert_infos, @@ -144,7 +144,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class SearchVectorTask : public BaseTask { +class SearchVectorTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const ::milvus::grpc::SearchVectorInfos& searchVectorInfos, @@ -166,7 +166,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class GetTableRowCountTask : public BaseTask { +class GetTableRowCountTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name, int64_t& row_count); @@ -183,7 +183,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class PingTask : public BaseTask { +class PingTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& cmd, std::string& result); diff --git a/cpp/src/server/thrift_impl/RequestHandler.cpp b/cpp/src/server/thrift_impl/RequestHandler.cpp index 5074043148..e7c8e8b1b5 100644 --- a/cpp/src/server/thrift_impl/RequestHandler.cpp +++ b/cpp/src/server/thrift_impl/RequestHandler.cpp @@ -47,18 +47,18 @@ RequestHandler::BuildIndex(const std::string &table_name) { void RequestHandler::AddVector(std::vector &_return, - const std::string &table_name, - const std::vector &record_array) { + const std::string &table_name, + const std::vector &record_array) { BaseTaskPtr task_ptr = AddVectorTask::Create(table_name, record_array, _return); RequestScheduler::ExecTask(task_ptr); } void RequestHandler::SearchVector(std::vector &_return, - const std::string &table_name, - const std::vector &query_record_array, - const std::vector &query_range_array, - const int64_t topk) { + const std::string &table_name, + const std::vector &query_record_array, + const std::vector &query_range_array, + const int64_t topk) { // SERVER_LOG_DEBUG << "Entering RequestHandler::SearchVector"; BaseTaskPtr task_ptr = SearchVectorTask1::Create(table_name, std::vector(), query_record_array, query_range_array, topk, _return); @@ -67,10 +67,10 @@ RequestHandler::SearchVector(std::vector &_return, void RequestHandler::SearchVector2(std::vector & _return, - const std::string& table_name, - const std::vector & query_record_array, - const std::vector & query_range_array, - const int64_t topk) { + const std::string& table_name, + const std::vector & query_record_array, + const std::vector & query_range_array, + const int64_t topk) { BaseTaskPtr task_ptr = SearchVectorTask2::Create(table_name, std::vector(), query_record_array, query_range_array, topk, _return); RequestScheduler::ExecTask(task_ptr); @@ -78,11 +78,11 @@ RequestHandler::SearchVector2(std::vector & _return, void RequestHandler::SearchVectorInFiles(std::vector<::milvus::thrift::TopKQueryResult> &_return, - const std::string& table_name, - const std::vector &file_id_array, - const std::vector<::milvus::thrift::RowRecord> &query_record_array, - const std::vector<::milvus::thrift::Range> &query_range_array, - const int64_t topk) { + const std::string& table_name, + const std::vector &file_id_array, + const std::vector<::milvus::thrift::RowRecord> &query_record_array, + const std::vector<::milvus::thrift::Range> &query_range_array, + const int64_t topk) { // SERVER_LOG_DEBUG << "Entering RequestHandler::SearchVectorInFiles. file_id_array size = " << std::to_string(file_id_array.size()); BaseTaskPtr task_ptr = SearchVectorTask1::Create(table_name, file_id_array, query_record_array, query_range_array, topk, _return); From ebd8cd9aeb725c6e94e7a76abf0f8800c997ffe0 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Fri, 9 Aug 2019 17:00:32 +0800 Subject: [PATCH 2/3] merge upstream and fix bug Former-commit-id: 45fa6852c3ad2303263cc836d30f3d2f76dd964f --- cpp/CHANGELOG.md | 1 + cpp/src/CMakeLists.txt | 1 + cpp/src/sdk/CMakeLists.txt | 6 +- cpp/src/sdk/examples/CMakeLists.txt | 6 +- cpp/src/sdk/examples/grpcsimple/main.cpp | 2 +- cpp/src/server/Server.cpp | 13 +- cpp/src/server/grpc_impl/GrpcMilvusServer.cpp | 22 +- cpp/src/server/grpc_impl/GrpcMilvusServer.h | 11 +- .../server/grpc_impl/GrpcRequestHandler.cpp | 65 ++--- cpp/src/server/grpc_impl/GrpcRequestHandler.h | 62 +++-- .../server/grpc_impl/GrpcRequestScheduler.cpp | 64 ++--- .../server/grpc_impl/GrpcRequestScheduler.h | 26 +- cpp/src/server/grpc_impl/GrpcRequestTask.cpp | 241 +++++++++--------- cpp/src/server/grpc_impl/GrpcRequestTask.h | 75 +++--- 14 files changed, 317 insertions(+), 278 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index adcceba3e8..b7b360b30f 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -60,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-310 - Add milvus CPU utilization ratio and CPU/GPU temperature metrics - MS-324 - Show error when there is not enough gpu memory to build index - MS-328 - Check metric type on server start +- MS-332 - Set grpc and thrift server run concurrently ## New Feature - MS-180 - Add new mem manager diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index b84dde009c..8d2bd6b69e 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -223,6 +223,7 @@ set(knowhere_libs ${grpcserver_files} ${utils_files} ${thrift_service_files} + ${grpc_service_files} ${metrics_files}) if (ENABLE_LICENSE STREQUAL "ON") diff --git a/cpp/src/sdk/CMakeLists.txt b/cpp/src/sdk/CMakeLists.txt index 21fec895c1..5d00b9997f 100644 --- a/cpp/src/sdk/CMakeLists.txt +++ b/cpp/src/sdk/CMakeLists.txt @@ -12,7 +12,7 @@ include_directories(/usr/include) include_directories(include) include_directories(/usr/local/include) -if (MILVUS_WITH_THRIFT STREQUAL "ON") +#if (MILVUS_WITH_THRIFT STREQUAL "ON") aux_source_directory(thrift thrift_client_files) include_directories(thrift) include_directories(${CMAKE_SOURCE_DIR}/src/thrift/gen-cpp) @@ -34,7 +34,7 @@ if (MILVUS_WITH_THRIFT STREQUAL "ON") ${third_party_libs} ) install(TARGETS milvus_thrift_sdk DESTINATION lib) -else() +#else() aux_source_directory(grpc grpc_client_files) include_directories(${CMAKE_SOURCE_DIR}/src/grpc/gen-milvus) @@ -58,6 +58,6 @@ else() ${third_party_libs} ) install(TARGETS milvus_grpc_sdk DESTINATION lib) -endif() +#endif() add_subdirectory(examples) diff --git a/cpp/src/sdk/examples/CMakeLists.txt b/cpp/src/sdk/examples/CMakeLists.txt index ceeb88c0e0..10b59ace25 100644 --- a/cpp/src/sdk/examples/CMakeLists.txt +++ b/cpp/src/sdk/examples/CMakeLists.txt @@ -4,8 +4,8 @@ # Proprietary and confidential. #------------------------------------------------------------------------------- -if (MILVUS_WITH_THRIFT STREQUAL "ON") +#if (MILVUS_WITH_THRIFT STREQUAL "ON") add_subdirectory(thriftsimple) -else() +#else() add_subdirectory(grpcsimple) -endif() \ No newline at end of file +#endif() \ No newline at end of file diff --git a/cpp/src/sdk/examples/grpcsimple/main.cpp b/cpp/src/sdk/examples/grpcsimple/main.cpp index 499b8a9935..a7b1bf4643 100644 --- a/cpp/src/sdk/examples/grpcsimple/main.cpp +++ b/cpp/src/sdk/examples/grpcsimple/main.cpp @@ -25,7 +25,7 @@ main(int argc, char *argv[]) { {NULL, 0, 0, 0}}; int option_index = 0; - std::string address = "127.0.0.1", port = "19530"; + std::string address = "127.0.0.1", port = "19531"; app_name = argv[0]; int value; diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 13218faae2..ea2a5b7352 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -3,6 +3,7 @@ // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#include #include "Server.h" //#include "ServerConfig.h" //#ifdef MILVUS_ENABLE_THRIFT @@ -224,13 +225,19 @@ Server::LoadConfig() { void Server::StartService() { - MilvusServer::StartService(); - GrpcMilvusServer::StartService(); + std::thread thrift_thread = std::thread(&MilvusServer::StartService); + std::thread grpc_thread = std::thread(&grpc::GrpcMilvusServer::StartService); + thrift_thread.join(); + grpc_thread.join(); +// +// MilvusServer::StartService(); +// grpc::GrpcMilvusServer::StartService(); } void Server::StopService() { - GrpcMilvusServer::StopService(); + MilvusServer::StartService(); + grpc::GrpcMilvusServer::StopService(); } } diff --git a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp index 98631a5dda..10ceaca788 100644 --- a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #include "milvus.grpc.pb.h" #include "GrpcMilvusServer.h" #include "../ServerConfig.h" @@ -28,14 +28,15 @@ namespace zilliz { namespace milvus { namespace server { +namespace grpc { -static std::unique_ptr server; +static std::unique_ptr<::grpc::Server> server; constexpr long MESSAGE_SIZE = -1; void GrpcMilvusServer::StartService() { - if (server != nullptr){ + if (server != nullptr) { std::cout << "stopservice!\n"; StopService(); } @@ -44,15 +45,15 @@ GrpcMilvusServer::StartService() { ConfigNode server_config = config.GetConfig(CONFIG_SERVER); ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE); std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1"); - int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19531); + int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19530); faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20); DBWrapper::DB();//initialize db - std::string server_address(address + ":" + std::to_string(port)); + std::string server_address(address + ":" + std::to_string(port + 1)); - grpc::ServerBuilder builder; + ::grpc::ServerBuilder builder; builder.SetMaxReceiveMessageSize(MESSAGE_SIZE); //default 4 * 1024 * 1024 builder.SetMaxSendMessageSize(MESSAGE_SIZE); @@ -62,7 +63,7 @@ GrpcMilvusServer::StartService() { GrpcRequestHandler service; - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials()); builder.RegisterService(&service); server = builder.BuildAndStart(); @@ -77,6 +78,7 @@ GrpcMilvusServer::StopService() { } } +} } } } \ No newline at end of file diff --git a/cpp/src/server/grpc_impl/GrpcMilvusServer.h b/cpp/src/server/grpc_impl/GrpcMilvusServer.h index 4a3eec689f..fe62563bdf 100644 --- a/cpp/src/server/grpc_impl/GrpcMilvusServer.h +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.h @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #pragma once #include @@ -11,6 +11,8 @@ namespace zilliz { namespace milvus { namespace server { +namespace grpc { + class GrpcMilvusServer { public: static void @@ -23,3 +25,4 @@ public: } } } +} diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index 3d9fd01398..562f4912b1 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #include "GrpcRequestHandler.h" #include "GrpcRequestTask.h" @@ -11,6 +11,7 @@ namespace zilliz { namespace milvus { namespace server { +namespace grpc { ::grpc::Status GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, @@ -38,9 +39,9 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext *context, } ::grpc::Status -GrpcRequestHandler::DropTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::Status* response) { +GrpcRequestHandler::DropTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name()); GrpcRequestScheduler::ExecTask(task_ptr, response); @@ -48,9 +49,9 @@ GrpcRequestHandler::DropTable(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::BuildIndex(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::Status* response) { +GrpcRequestHandler::BuildIndex(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = BuildIndexTask::Create(request->table_name()); GrpcRequestScheduler::ExecTask(task_ptr, response); @@ -58,9 +59,9 @@ GrpcRequestHandler::BuildIndex(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::InsertVector(::grpc::ServerContext* context, - const ::milvus::grpc::InsertInfos* request, - ::milvus::grpc::VectorIds* response) { +GrpcRequestHandler::InsertVector(::grpc::ServerContext *context, + const ::milvus::grpc::InsertInfos *request, + ::milvus::grpc::VectorIds *response) { BaseTaskPtr task_ptr = InsertVectorTask::Create(*request, *response); ::milvus::grpc::Status grpc_status; @@ -71,9 +72,9 @@ GrpcRequestHandler::InsertVector(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::SearchVector(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInfos* request, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { +GrpcRequestHandler::SearchVector(::grpc::ServerContext *context, + const ::milvus::grpc::SearchVectorInfos *request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) { std::vector file_id_array; BaseTaskPtr task_ptr = SearchVectorTask::Create(*request, file_id_array, *writer); @@ -88,9 +89,9 @@ GrpcRequestHandler::SearchVector(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInFilesInfos* request, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { +GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext *context, + const ::milvus::grpc::SearchVectorInFilesInfos *request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) { std::vector file_id_array; BaseTaskPtr task_ptr = SearchVectorTask::Create(request->search_vector_infos(), file_id_array, *writer); @@ -105,9 +106,9 @@ GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::TableSchema* response) { +GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableSchema *response) { BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), *response); ::milvus::grpc::Status grpc_status; @@ -118,9 +119,9 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::TableRowCount* response) { +GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableRowCount *response) { int64_t row_count = 0; BaseTaskPtr task_ptr = GetTableRowCountTask::Create(request->table_name(), row_count); @@ -133,9 +134,9 @@ GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, - ::grpc::ServerWriter<::milvus::grpc::TableName>* writer) { +GrpcRequestHandler::ShowTables(::grpc::ServerContext *context, + const ::milvus::grpc::Command *request, + ::grpc::ServerWriter<::milvus::grpc::TableName> *writer) { BaseTaskPtr task_ptr = ShowTablesTask::Create(*writer); ::milvus::grpc::Status grpc_status; @@ -149,9 +150,9 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, } ::grpc::Status -GrpcRequestHandler::Ping(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, - ::milvus::grpc::ServerStatus* response) { +GrpcRequestHandler::Ping(::grpc::ServerContext *context, + const ::milvus::grpc::Command *request, + ::milvus::grpc::ServerStatus *response) { std::string result; BaseTaskPtr task_ptr = PingTask::Create(request->cmd(), result); @@ -163,7 +164,7 @@ GrpcRequestHandler::Ping(::grpc::ServerContext* context, return ::grpc::Status::OK; } - +} } } } \ No newline at end of file diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.h b/cpp/src/server/grpc_impl/GrpcRequestHandler.h index 34c8f7493d..5b21058331 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.h @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #pragma once #include @@ -14,6 +14,7 @@ namespace zilliz { namespace milvus { namespace server { +namespace grpc { class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service { public: /** @@ -32,8 +33,8 @@ public: * @param context */ ::grpc::Status - CreateTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableSchema* request, ::milvus::grpc::Status* response) override ; + CreateTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableSchema *request, ::milvus::grpc::Status *response) override; /** * @brief Test table existence method @@ -51,8 +52,8 @@ public: * @param context */ ::grpc::Status - HasTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, ::milvus::grpc::BoolReply* response) override ; + HasTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, ::milvus::grpc::BoolReply *response) override; /** * @brief Drop table method @@ -70,8 +71,8 @@ public: * @param context */ ::grpc::Status - DropTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, ::milvus::grpc::Status* response) override; + DropTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, ::milvus::grpc::Status *response) override; /** * @brief build index by table method @@ -89,8 +90,8 @@ public: * @param context */ ::grpc::Status - BuildIndex(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, ::milvus::grpc::Status* response) override; + BuildIndex(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, ::milvus::grpc::Status *response) override; /** @@ -109,8 +110,9 @@ public: * @param response */ ::grpc::Status - InsertVector(::grpc::ServerContext* context, - const ::milvus::grpc::InsertInfos* request, ::milvus::grpc::VectorIds* response) override; + InsertVector(::grpc::ServerContext *context, + const ::milvus::grpc::InsertInfos *request, + ::milvus::grpc::VectorIds *response) override; /** * @brief Query vector @@ -133,8 +135,9 @@ public: * @param writer */ ::grpc::Status - SearchVector(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInfos* request, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) override; + SearchVector(::grpc::ServerContext *context, + const ::milvus::grpc::SearchVectorInfos *request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) override; /** * @brief Internal use query interface @@ -157,8 +160,9 @@ public: * @param writer */ ::grpc::Status - SearchVectorInFiles(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInFilesInfos* request, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) override; + SearchVectorInFiles(::grpc::ServerContext *context, + const ::milvus::grpc::SearchVectorInFilesInfos *request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) override; /** * @brief Get table schema @@ -176,8 +180,9 @@ public: * @param response */ ::grpc::Status - DescribeTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, ::milvus::grpc::TableSchema* response) override; + DescribeTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableSchema *response) override; /** * @brief Get table row count @@ -195,8 +200,9 @@ public: * @param context */ ::grpc::Status - GetTableRowCount(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, ::milvus::grpc::TableRowCount* response) override; + GetTableRowCount(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableRowCount *response) override; /** * @brief List all tables in database @@ -214,8 +220,9 @@ public: * @param writer */ ::grpc::Status - ShowTables(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, ::grpc::ServerWriter< ::milvus::grpc::TableName>* writer) override; + ShowTables(::grpc::ServerContext *context, + const ::milvus::grpc::Command *request, + ::grpc::ServerWriter<::milvus::grpc::TableName> *writer) override; /** * @brief Give the server status @@ -233,13 +240,12 @@ public: * @param response */ ::grpc::Status - Ping(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, ::milvus::grpc::ServerStatus* response) override; + Ping(::grpc::ServerContext *context, + const ::milvus::grpc::Command *request, ::milvus::grpc::ServerStatus *response) override; }; } } } - - +} diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index 43b3d6d791..ef60aba8a3 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #include "GrpcRequestScheduler.h" #include "utils/Log.h" @@ -11,11 +11,12 @@ namespace zilliz { namespace milvus { namespace server { +namespace grpc { using namespace ::milvus; namespace { - const std::map &ErrorMap() { + const std::map &ErrorMap() { static const std::map code_map = { {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, @@ -50,7 +51,7 @@ namespace { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -GrpcBaseTask::GrpcBaseTask(const std::string& task_group, bool async) +GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async) : task_group_(task_group), async_(async), done_(false), @@ -71,7 +72,7 @@ GrpcBaseTask::Execute() { } ServerError -GrpcBaseTask::SetError(ServerError error_code, const std::string& error_msg) { +GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { error_code_ = error_code; error_msg_ = error_msg; @@ -81,7 +82,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string& error_msg) { ServerError GrpcBaseTask::WaitToFinish() { - std::unique_lock lock(finish_mtx_); + std::unique_lock lock(finish_mtx_); finish_cond_.wait(lock, [this] { return done_; }); return error_code_; @@ -98,15 +99,15 @@ GrpcRequestScheduler::~GrpcRequestScheduler() { } void -GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_status) { - if(task_ptr == nullptr) { +GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { + if (task_ptr == nullptr) { return; } - GrpcRequestScheduler& scheduler = GrpcRequestScheduler::GetInstance(); + GrpcRequestScheduler &scheduler = GrpcRequestScheduler::GetInstance(); scheduler.ExecuteTask(task_ptr); - if(!task_ptr->IsAsync()) { + if (!task_ptr->IsAsync()) { task_ptr->WaitToFinish(); ServerError err = task_ptr->ErrorCode(); if (err != SERVER_SUCCESS) { @@ -118,7 +119,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *gr void GrpcRequestScheduler::Start() { - if(!stopped_) { + if (!stopped_) { return; } @@ -127,22 +128,22 @@ GrpcRequestScheduler::Start() { void GrpcRequestScheduler::Stop() { - if(stopped_) { + if (stopped_) { return; } SERVER_LOG_INFO << "Scheduler gonna stop..."; { std::lock_guard lock(queue_mtx_); - for(auto iter : task_groups_) { - if(iter.second != nullptr) { + for (auto iter : task_groups_) { + if (iter.second != nullptr) { iter.second->Put(nullptr); } } } - for(auto iter : execute_threads_) { - if(iter == nullptr) + for (auto iter : execute_threads_) { + if (iter == nullptr) continue; iter->join(); @@ -152,18 +153,18 @@ GrpcRequestScheduler::Stop() { } ServerError -GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) { - if(task_ptr == nullptr) { +GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { + if (task_ptr == nullptr) { return SERVER_NULL_POINTER; } - ServerError err = PutTaskToQueue(task_ptr); - if(err != SERVER_SUCCESS) { - SERVER_LOG_ERROR << "Put task to queue failed with code: " << err ; + ServerError err = PutTaskToQueue(task_ptr); + if (err != SERVER_SUCCESS) { + SERVER_LOG_ERROR << "Put task to queue failed with code: " << err; return err; } - if(task_ptr->IsAsync()) { + if (task_ptr->IsAsync()) { return SERVER_SUCCESS;//async execution, caller need to call WaitToFinish at somewhere } @@ -172,11 +173,11 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) { namespace { void TakeTaskToExecute(TaskQueuePtr task_queue) { - if(task_queue == nullptr) { + if (task_queue == nullptr) { return; } - while(true) { + while (true) { BaseTaskPtr task = task_queue->Take(); if (task == nullptr) { SERVER_LOG_ERROR << "Take null from task queue, stop thread"; @@ -185,22 +186,22 @@ namespace { try { ServerError err = task->Execute(); - if(err != SERVER_SUCCESS) { + if (err != SERVER_SUCCESS) { SERVER_LOG_ERROR << "Task failed with code: " << err; } - } catch (std::exception& ex) { + } catch (std::exception &ex) { SERVER_LOG_ERROR << "Task failed to execute: " << ex.what(); } } } } -ServerError -GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) { +ServerError +GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); - if(task_groups_.count(group_name) > 0) { + if (task_groups_.count(group_name) > 0) { task_groups_[group_name]->Put(task_ptr); } else { TaskQueuePtr queue = std::make_shared(); @@ -219,3 +220,4 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) { } } } +} diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h index ca878980dd..a436e8dec6 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #pragma once #include "utils/BlockingQueue.h" @@ -16,10 +16,12 @@ namespace zilliz { namespace milvus { namespace server { +namespace grpc { class GrpcBaseTask { protected: - GrpcBaseTask(const std::string& task_group, bool async = false); + GrpcBaseTask(const std::string &task_group, bool async = false); + virtual ~GrpcBaseTask(); public: @@ -46,7 +48,7 @@ protected: OnExecute() = 0; ServerError - SetError(ServerError error_code, const std::string& msg); + SetError(ServerError error_code, const std::string &msg); protected: mutable std::mutex finish_mtx_; @@ -66,26 +68,28 @@ using ThreadPtr = std::shared_ptr; class GrpcRequestScheduler { public: - static GrpcRequestScheduler& GetInstance() { + static GrpcRequestScheduler &GetInstance() { static GrpcRequestScheduler scheduler; return scheduler; } void Start(); + void Stop(); ServerError - ExecuteTask(const BaseTaskPtr& task_ptr); + ExecuteTask(const BaseTaskPtr &task_ptr); static void - ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status); + ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); protected: GrpcRequestScheduler(); + virtual ~GrpcRequestScheduler(); ServerError - PutTaskToQueue(const BaseTaskPtr& task_ptr); + PutTaskToQueue(const BaseTaskPtr &task_ptr); private: mutable std::mutex queue_mtx_; @@ -97,7 +101,7 @@ private: bool stopped_; }; - +} } } } diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index 975bf4a82b..2a12528206 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #include "GrpcRequestTask.h" #include "../ServerConfig.h" #include "utils/CommonUtil.h" @@ -18,9 +18,11 @@ namespace zilliz { namespace milvus { namespace server { -static const char* DQL_TASK_GROUP = "dql"; -static const char* DDL_DML_TASK_GROUP = "ddl_dml"; -static const char* PING_TASK_GROUP = "ping"; +namespace grpc { + +static const char *DQL_TASK_GROUP = "dql"; +static const char *DDL_DML_TASK_GROUP = "ddl_dml"; +static const char *PING_TASK_GROUP = "ping"; using DB_META = zilliz::milvus::engine::meta::Meta; using DB_DATE = zilliz::milvus::engine::meta::DateT; @@ -34,7 +36,7 @@ namespace { {3, engine::EngineType::FAISS_IVFSQ8}, }; - if(map_type.find(type) == map_type.end()) { + if (map_type.find(type) == map_type.end()) { return engine::EngineType::INVALID; } @@ -43,13 +45,13 @@ namespace { int IndexType(engine::EngineType type) { static std::map map_type = { - {engine::EngineType::INVALID, 0}, - {engine::EngineType::FAISS_IDMAP, 1}, + {engine::EngineType::INVALID, 0}, + {engine::EngineType::FAISS_IDMAP, 1}, {engine::EngineType::FAISS_IVFFLAT, 2}, - {engine::EngineType::FAISS_IVFSQ8, 3}, + {engine::EngineType::FAISS_IVFSQ8, 3}, }; - if(map_type.find(type) == map_type.end()) { + if (map_type.find(type) == map_type.end()) { return 0; } @@ -60,38 +62,40 @@ namespace { void ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array, - std::vector& dates, - ServerError& error_code, - std::string& error_msg) { + std::vector &dates, + ServerError &error_code, + std::string &error_msg) { dates.clear(); - for(auto& range : range_array) { + for (auto &range : range_array) { time_t tt_start, tt_end; tm tm_start, tm_end; - if(!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)){ + if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) { error_code = SERVER_INVALID_TIME_RANGE; error_msg = "Invalid time range: " + range.start_value(); return; } - if(!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)){ + if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) { error_code = SERVER_INVALID_TIME_RANGE; error_msg = "Invalid time range: " + range.start_value(); return; } - long days = (tt_end > tt_start) ? (tt_end - tt_start)/DAY_SECONDS : (tt_start - tt_end)/DAY_SECONDS; - if(days == 0) { + long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / + DAY_SECONDS; + if (days == 0) { error_code = SERVER_INVALID_TIME_RANGE; error_msg = "Invalid time range: " + range.start_value() + " to " + range.end_value(); - return ; + return; } - for(long i = 0; i < days; i++) { - time_t tt_day = tt_start + DAY_SECONDS*i; + for (long i = 0; i < days; i++) { + time_t tt_day = tt_start + DAY_SECONDS * i; tm tm_day; CommonUtil::ConvertTime(tt_day, tm_day); - long date = tm_day.tm_year*10000 + tm_day.tm_mon*100 + tm_day.tm_mday;//according to db logic + long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + + tm_day.tm_mday;//according to db logic dates.push_back(date); } } @@ -99,16 +103,14 @@ namespace { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema& schema) +CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema &schema) : GrpcBaseTask(DDL_DML_TASK_GROUP), schema_(schema) { } BaseTaskPtr -CreateTableTask::Create(const ::milvus::grpc::TableSchema& schema) { -// BaseTaskPtr create_table_task_ptr = std::make_shared(schema); -// return create_table_task_ptr; +CreateTableTask::Create(const ::milvus::grpc::TableSchema &schema) { return std::shared_ptr(new CreateTableTask(schema)); } @@ -119,35 +121,35 @@ CreateTableTask::OnExecute() { try { //step 1: check arguments ServerError res = ValidationUtil::ValidateTableName(schema_.table_name().table_name()); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + schema_.table_name().table_name()); } res = ValidationUtil::ValidateTableDimension(schema_.dimension()); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table dimension: " + std::to_string(schema_.dimension())); } res = ValidationUtil::ValidateTableIndexType(schema_.index_type()); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid index type: " + std::to_string(schema_.index_type())); } //step 2: construct table schema engine::meta::TableSchema table_info; - table_info.dimension_ = (uint16_t)schema_.dimension(); + table_info.dimension_ = (uint16_t) schema_.dimension(); table_info.table_id_ = schema_.table_name().table_name(); - table_info.engine_type_ = (int)EngineType(schema_.index_type()); + table_info.engine_type_ = (int) EngineType(schema_.index_type()); table_info.store_raw_data_ = schema_.store_raw_vector(); //step 3: create table engine::Status stat = DBWrapper::DB()->CreateTable(table_info); - if(!stat.ok()) { + if (!stat.ok()) { //table could exist return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -157,14 +159,14 @@ CreateTableTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema& schema) +DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema &schema) : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), schema_(schema) { } BaseTaskPtr -DescribeTableTask::Create(const std::string& table_name, ::milvus::grpc::TableSchema& schema) { +DescribeTableTask::Create(const std::string &table_name, ::milvus::grpc::TableSchema &schema) { return std::shared_ptr(new DescribeTableTask(table_name, schema)); } @@ -175,7 +177,7 @@ DescribeTableTask::OnExecute() { try { //step 1: check arguments ServerError res = ValidationUtil::ValidateTableName(table_name_); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + table_name_); } @@ -183,17 +185,17 @@ DescribeTableTask::OnExecute() { engine::meta::TableSchema table_info; table_info.table_id_ = table_name_; engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } schema_.mutable_table_name()->set_table_name(table_info.table_id_); - schema_.set_index_type(IndexType((engine::EngineType)table_info.engine_type_)); + schema_.set_index_type(IndexType((engine::EngineType) table_info.engine_type_)); schema_.set_dimension(table_info.dimension_); schema_.set_store_raw_vector(table_info.store_raw_data_); - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -203,13 +205,13 @@ DescribeTableTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -BuildIndexTask::BuildIndexTask(const std::string& table_name) +BuildIndexTask::BuildIndexTask(const std::string &table_name) : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { } BaseTaskPtr -BuildIndexTask::Create(const std::string& table_name) { +BuildIndexTask::Create(const std::string &table_name) { return std::shared_ptr(new BuildIndexTask(table_name)); } @@ -220,28 +222,28 @@ BuildIndexTask::OnExecute() { //step 1: check arguments ServerError res = ValidationUtil::ValidateTableName(table_name_); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + table_name_); } bool has_table = false; engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } - if(!has_table) { + if (!has_table) { return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists"); } //step 2: check table existence stat = DBWrapper::DB()->BuildIndex(table_name_); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString()); } rc.ElapseFromBegin("totally cost"); - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -249,7 +251,7 @@ BuildIndexTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -HasTableTask::HasTableTask(const std::string& table_name, bool& has_table) +HasTableTask::HasTableTask(const std::string &table_name, bool &has_table) : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), has_table_(has_table) { @@ -257,7 +259,7 @@ HasTableTask::HasTableTask(const std::string& table_name, bool& has_table) } BaseTaskPtr -HasTableTask::Create(const std::string& table_name, bool& has_table) { +HasTableTask::Create(const std::string &table_name, bool &has_table) { return std::shared_ptr(new HasTableTask(table_name, has_table)); } @@ -268,18 +270,18 @@ HasTableTask::OnExecute() { //step 1: check arguments ServerError res = ValidationUtil::ValidateTableName(table_name_); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + table_name_); } //step 2: check table existence engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } rc.ElapseFromBegin("totally cost"); - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -287,14 +289,14 @@ HasTableTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -DropTableTask::DropTableTask(const std::string& table_name) +DropTableTask::DropTableTask(const std::string &table_name) : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { } BaseTaskPtr -DropTableTask::Create(const std::string& table_name) { +DropTableTask::Create(const std::string &table_name) { return std::shared_ptr(new DropTableTask(table_name)); } @@ -305,7 +307,7 @@ DropTableTask::OnExecute() { //step 1: check arguments ServerError res = ValidationUtil::ValidateTableName(table_name_); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + table_name_); } @@ -313,8 +315,8 @@ DropTableTask::OnExecute() { engine::meta::TableSchema table_info; table_info.table_id_ = table_name_; engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); - if(!stat.ok()) { - if(stat.IsNotFound()) { + if (!stat.ok()) { + if (stat.IsNotFound()) { return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists"); } else { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); @@ -326,12 +328,12 @@ DropTableTask::OnExecute() { //step 3: Drop table std::vector dates; stat = DBWrapper::DB()->DeleteTable(table_name_, dates); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } rc.ElapseFromBegin("total cost"); - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -339,14 +341,14 @@ DropTableTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -ShowTablesTask::ShowTablesTask(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer) +ShowTablesTask::ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> &writer) : GrpcBaseTask(DDL_DML_TASK_GROUP), writer_(writer) { } BaseTaskPtr -ShowTablesTask::Create(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer) { +ShowTablesTask::Create(::grpc::ServerWriter<::milvus::grpc::TableName> &writer) { return std::shared_ptr(new ShowTablesTask(writer)); } @@ -354,11 +356,11 @@ ServerError ShowTablesTask::OnExecute() { std::vector schema_array; engine::Status stat = DBWrapper::DB()->AllTables(schema_array); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } - for(auto& schema : schema_array) { + for (auto &schema : schema_array) { ::milvus::grpc::TableName tableName; tableName.set_table_name(schema.table_id_); if (!writer_.Write(tableName)) { @@ -369,8 +371,8 @@ ShowTablesTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -InsertVectorTask::InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_infos, - ::milvus::grpc::VectorIds& record_ids) +InsertVectorTask::InsertVectorTask(const ::milvus::grpc::InsertInfos &insert_infos, + ::milvus::grpc::VectorIds &record_ids) : GrpcBaseTask(DDL_DML_TASK_GROUP), insert_infos_(insert_infos), record_ids_(record_ids) { @@ -378,8 +380,8 @@ InsertVectorTask::InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_inf } BaseTaskPtr -InsertVectorTask::Create(const ::milvus::grpc::InsertInfos& insert_infos, - ::milvus::grpc::VectorIds& record_ids) { +InsertVectorTask::Create(const ::milvus::grpc::InsertInfos &insert_infos, + ::milvus::grpc::VectorIds &record_ids) { return std::shared_ptr(new InsertVectorTask(insert_infos, record_ids)); } @@ -390,10 +392,10 @@ InsertVectorTask::OnExecute() { //step 1: check arguments ServerError res = ValidationUtil::ValidateTableName(insert_infos_.table_name()); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + insert_infos_.table_name()); } - if(insert_infos_.row_record_array().empty()) { + if (insert_infos_.row_record_array().empty()) { return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty"); } @@ -401,9 +403,10 @@ InsertVectorTask::OnExecute() { engine::meta::TableSchema table_info; table_info.table_id_ = insert_infos_.table_name(); engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); - if(!stat.ok()) { - if(stat.IsNotFound()) { - return SetError(SERVER_TABLE_NOT_EXIST, "Table " + insert_infos_.table_name() + " not exists"); + if (!stat.ok()) { + if (stat.IsNotFound()) { + return SetError(SERVER_TABLE_NOT_EXIST, + "Table " + insert_infos_.table_name() + " not exists"); } else { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } @@ -430,7 +433,8 @@ InsertVectorTask::OnExecute() { if (vec_dim != table_info.dimension_) { ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION; std::string error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim) - + " vs. table dimension:" + std::to_string(table_info.dimension_); + + " vs. table dimension:" + + std::to_string(table_info.dimension_); return SetError(error_code, error_msg); } vec_f[i * table_info.dimension_ + j] = insert_infos_.row_record_array(i).vector_data(j); @@ -440,12 +444,13 @@ InsertVectorTask::OnExecute() { rc.ElapseFromBegin("prepare vectors data"); //step 4: insert vectors - auto vec_count = (uint64_t)insert_infos_.row_record_array_size(); + auto vec_count = (uint64_t) insert_infos_.row_record_array_size(); std::vector vec_ids(record_ids_.vector_id_array_size(), 0); - stat = DBWrapper::DB()->InsertVectors(insert_infos_.table_name(), vec_count, vec_f.data(), vec_ids); + stat = DBWrapper::DB()->InsertVectors(insert_infos_.table_name(), vec_count, vec_f.data(), + vec_ids); rc.ElapseFromBegin("add vectors to engine"); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(SERVER_CACHE_ERROR, "Cache error: " + stat.ToString()); } for (int64_t id : vec_ids) { @@ -453,7 +458,7 @@ InsertVectorTask::OnExecute() { } auto ids_size = record_ids_.vector_id_array_size(); - if(ids_size != vec_count) { + if (ids_size != vec_count) { std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return " + std::to_string(ids_size) + " id"; return SetError(SERVER_ILLEGAL_VECTOR_ID, msg); @@ -466,7 +471,7 @@ InsertVectorTask::OnExecute() { rc.RecordSection("add vectors to engine"); rc.ElapseFromBegin("total cost"); - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -474,9 +479,9 @@ InsertVectorTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -SearchVectorTask::SearchVectorTask(const ::milvus::grpc::SearchVectorInfos& search_vector_infos, - const std::vector& file_id_array, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer) +SearchVectorTask::SearchVectorTask(const ::milvus::grpc::SearchVectorInfos &search_vector_infos, + const std::vector &file_id_array, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer) : GrpcBaseTask(DQL_TASK_GROUP), search_vector_infos_(search_vector_infos), file_id_array_(file_id_array), @@ -485,11 +490,11 @@ SearchVectorTask::SearchVectorTask(const ::milvus::grpc::SearchVectorInfos& sear } BaseTaskPtr -SearchVectorTask::Create(const ::milvus::grpc::SearchVectorInfos& search_vector_infos, - const std::vector& file_id_array, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer) { +SearchVectorTask::Create(const ::milvus::grpc::SearchVectorInfos &search_vector_infos, + const std::vector &file_id_array, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer) { return std::shared_ptr(new SearchVectorTask(search_vector_infos, file_id_array, - writer)); + writer)); } ServerError @@ -500,17 +505,17 @@ SearchVectorTask::OnExecute() { //step 1: check arguments std::string table_name_ = search_vector_infos_.table_name(); ServerError res = ValidationUtil::ValidateTableName(table_name_); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + table_name_); } int top_k_ = search_vector_infos_.topk(); - if(top_k_ <= 0 || top_k_ > 1024) { + if (top_k_ <= 0 || top_k_ > 1024) { return SetError(SERVER_INVALID_TOPK, "Invalid topk: " + std::to_string( top_k_)); } - if(search_vector_infos_.query_record_array().empty()) { + if (search_vector_infos_.query_record_array().empty()) { return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty"); } @@ -518,8 +523,8 @@ SearchVectorTask::OnExecute() { engine::meta::TableSchema table_info; table_info.table_id_ = table_name_; engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); - if(!stat.ok()) { - if(stat.IsNotFound()) { + if (!stat.ok()) { + if (stat.IsNotFound()) { return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists"); } else { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); @@ -536,7 +541,7 @@ SearchVectorTask::OnExecute() { range_array.emplace_back(search_vector_infos_.query_range_array(i)); } ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg); - if(error_code != SERVER_SUCCESS) { + if (error_code != SERVER_SUCCESS) { return SetError(error_code, error_msg); } @@ -555,41 +560,46 @@ SearchVectorTask::OnExecute() { for (size_t i = 0; i < record_array_size; i++) { for (size_t j = 0; j < table_info.dimension_; j++) { if (search_vector_infos_.query_record_array(i).vector_data().empty()) { - return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Query record float array is empty"); + return SetError(SERVER_INVALID_ROWRECORD_ARRAY, + "Query record float array is empty"); } - uint64_t query_vec_dim = search_vector_infos_.query_record_array(i).vector_data().size(); + uint64_t query_vec_dim = search_vector_infos_.query_record_array( + i).vector_data().size(); if (query_vec_dim != table_info.dimension_) { ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION; - std::string error_msg = "Invalid rowrecord dimension: " + std::to_string(query_vec_dim) - + " vs. table dimension:" + std::to_string(table_info.dimension_); + std::string error_msg = + "Invalid rowrecord dimension: " + std::to_string(query_vec_dim) + + " vs. table dimension:" + std::to_string(table_info.dimension_); return SetError(error_code, error_msg); } - vec_f[i * table_info.dimension_ + j] = search_vector_infos_.query_record_array(i).vector_data(j); + vec_f[i * table_info.dimension_ + j] = search_vector_infos_.query_record_array( + i).vector_data(j); } } rc.ElapseFromBegin("prepare vector data"); //step 4: search vectors engine::QueryResults results; - auto record_count = (uint64_t)search_vector_infos_.query_record_array().size(); + auto record_count = (uint64_t) search_vector_infos_.query_record_array().size(); - if(file_id_array_.empty()) { - stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results); + if (file_id_array_.empty()) { + stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), + dates, results); } else { stat = DBWrapper::DB()->Query(table_name_, file_id_array_, - (size_t) top_k_, record_count, vec_f.data(), dates, results); + (size_t) top_k_, record_count, vec_f.data(), dates, results); } rc.ElapseFromBegin("search vectors from engine"); - if(!stat.ok()) { + if (!stat.ok()) { return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } - if(results.empty()) { + if (results.empty()) { return SERVER_SUCCESS; //empty table } - if(results.size() != record_count) { + if (results.size() != record_count) { std::string msg = "Search " + std::to_string(record_count) + " vectors but only return " + std::to_string(results.size()) + " results"; return SetError(SERVER_ILLEGAL_SEARCH_RESULT, msg); @@ -598,11 +608,11 @@ SearchVectorTask::OnExecute() { rc.ElapseFromBegin("do search"); //step 5: construct result array - for(uint64_t i = 0; i < record_count; i++) { - auto& result = results[i]; + for (uint64_t i = 0; i < record_count; i++) { + auto &result = results[i]; const auto &record = search_vector_infos_.query_record_array(i); ::milvus::grpc::TopKQueryResult grpc_topk_result; - for(auto& pair : result) { + for (auto &pair : result) { ::milvus::grpc::QueryResult *grpc_result = grpc_topk_result.add_query_result_arrays(); grpc_result->set_id(pair.first); grpc_result->set_distance(pair.second); @@ -621,7 +631,7 @@ SearchVectorTask::OnExecute() { //step 6: print time cost percent - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -629,7 +639,7 @@ SearchVectorTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count) +GetTableRowCountTask::GetTableRowCountTask(const std::string &table_name, int64_t &row_count) : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), row_count_(row_count) { @@ -637,7 +647,7 @@ GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_ } BaseTaskPtr -GetTableRowCountTask::Create(const std::string& table_name, int64_t& row_count) { +GetTableRowCountTask::Create(const std::string &table_name, int64_t &row_count) { return std::shared_ptr(new GetTableRowCountTask(table_name, row_count)); } @@ -649,7 +659,7 @@ GetTableRowCountTask::OnExecute() { //step 1: check arguments ServerError res = SERVER_SUCCESS; res = ValidationUtil::ValidateTableName(table_name_); - if(res != SERVER_SUCCESS) { + if (res != SERVER_SUCCESS) { return SetError(res, "Invalid table name: " + table_name_); } @@ -664,7 +674,7 @@ GetTableRowCountTask::OnExecute() { rc.ElapseFromBegin("total cost"); - } catch (std::exception& ex) { + } catch (std::exception &ex) { return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -672,7 +682,7 @@ GetTableRowCountTask::OnExecute() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -PingTask::PingTask(const std::string& cmd, std::string& result) +PingTask::PingTask(const std::string &cmd, std::string &result) : GrpcBaseTask(PING_TASK_GROUP), cmd_(cmd), result_(result) { @@ -680,13 +690,13 @@ PingTask::PingTask(const std::string& cmd, std::string& result) } BaseTaskPtr -PingTask::Create(const std::string& cmd, std::string& result) { +PingTask::Create(const std::string &cmd, std::string &result) { return std::shared_ptr(new PingTask(cmd, result)); } ServerError PingTask::OnExecute() { - if(cmd_ == "version") { + if (cmd_ == "version") { result_ = MILVUS_VERSION; } else { result_ = "OK"; @@ -695,6 +705,7 @@ PingTask::OnExecute() { return SERVER_SUCCESS; } +} } } } \ No newline at end of file diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.h b/cpp/src/server/grpc_impl/GrpcRequestTask.h index cb75bf7ece..b875383363 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.h +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.h @@ -1,8 +1,8 @@ /******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ +* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +* Unauthorized copying of this file, via any medium is strictly prohibited. +* Proprietary and confidential. +******************************************************************************/ #pragma once #include "GrpcRequestScheduler.h" #include "utils/Error.h" @@ -17,16 +17,17 @@ namespace zilliz { namespace milvus { namespace server { +namespace grpc { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CreateTableTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const ::milvus::grpc::TableSchema& schema); + Create(const ::milvus::grpc::TableSchema &schema); protected: explicit - CreateTableTask(const ::milvus::grpc::TableSchema& request); + CreateTableTask(const ::milvus::grpc::TableSchema &request); ServerError OnExecute() override; @@ -39,10 +40,10 @@ private: class HasTableTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const std::string& table_name, bool& has_table); + Create(const std::string &table_name, bool &has_table); protected: - HasTableTask(const std::string& request, bool& has_table); + HasTableTask(const std::string &request, bool &has_table); ServerError OnExecute() override; @@ -50,17 +51,17 @@ protected: private: std::string table_name_; - bool& has_table_; + bool &has_table_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DescribeTableTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const std::string& table_name, ::milvus::grpc::TableSchema& schema); + Create(const std::string &table_name, ::milvus::grpc::TableSchema &schema); protected: - DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema& schema); + DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema &schema); ServerError OnExecute() override; @@ -68,18 +69,18 @@ protected: private: std::string table_name_; - ::milvus::grpc::TableSchema& schema_; + ::milvus::grpc::TableSchema &schema_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DropTableTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const std::string& table_name); + Create(const std::string &table_name); protected: explicit - DropTableTask(const std::string& table_name); + DropTableTask(const std::string &table_name); ServerError OnExecute() override; @@ -93,11 +94,11 @@ private: class BuildIndexTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const std::string& table_name); + Create(const std::string &table_name); protected: explicit - BuildIndexTask(const std::string& table_name); + BuildIndexTask(const std::string &table_name); ServerError OnExecute() override; @@ -111,50 +112,50 @@ private: class ShowTablesTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer); + Create(::grpc::ServerWriter<::milvus::grpc::TableName> &writer); protected: explicit - ShowTablesTask(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer); + ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> &writer); ServerError OnExecute() override; private: - ::grpc::ServerWriter< ::milvus::grpc::TableName> writer_; + ::grpc::ServerWriter<::milvus::grpc::TableName> writer_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class InsertVectorTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const ::milvus::grpc::InsertInfos& insert_infos, - ::milvus::grpc::VectorIds& record_ids_); + Create(const ::milvus::grpc::InsertInfos &insert_infos, + ::milvus::grpc::VectorIds &record_ids_); protected: - InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_infos, - ::milvus::grpc::VectorIds& record_ids_); + InsertVectorTask(const ::milvus::grpc::InsertInfos &insert_infos, + ::milvus::grpc::VectorIds &record_ids_); ServerError OnExecute() override; private: const ::milvus::grpc::InsertInfos insert_infos_; - ::milvus::grpc::VectorIds& record_ids_; + ::milvus::grpc::VectorIds &record_ids_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class SearchVectorTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const ::milvus::grpc::SearchVectorInfos& searchVectorInfos, - const std::vector& file_id_array, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer); + Create(const ::milvus::grpc::SearchVectorInfos &searchVectorInfos, + const std::vector &file_id_array, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer); protected: - SearchVectorTask(const ::milvus::grpc::SearchVectorInfos& searchVectorInfos, - const std::vector& file_id_array, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer); + SearchVectorTask(const ::milvus::grpc::SearchVectorInfos &searchVectorInfos, + const std::vector &file_id_array, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer); ServerError OnExecute() override; @@ -169,36 +170,36 @@ private: class GetTableRowCountTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const std::string& table_name, int64_t& row_count); + Create(const std::string &table_name, int64_t &row_count); protected: - GetTableRowCountTask(const std::string& table_name, int64_t& row_count); + GetTableRowCountTask(const std::string &table_name, int64_t &row_count); ServerError OnExecute() override; private: std::string table_name_; - int64_t& row_count_; + int64_t &row_count_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class PingTask : public GrpcBaseTask { public: static BaseTaskPtr - Create(const std::string& cmd, std::string& result); + Create(const std::string &cmd, std::string &result); protected: - PingTask(const std::string& cmd, std::string& result); + PingTask(const std::string &cmd, std::string &result); ServerError OnExecute() override; private: std::string cmd_; - std::string& result_; + std::string &result_; }; - +} } } } \ No newline at end of file From 643a85999dfb7719bd920a217c3e97a76df1b282 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Fri, 9 Aug 2019 17:01:55 +0800 Subject: [PATCH 3/3] modify start_server.sh Former-commit-id: 0a020148bf0e9a22fc833f9d197fba63c7cc2377 --- cpp/scripts/start_server.sh | 2 +- cpp/start_server.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/scripts/start_server.sh b/cpp/scripts/start_server.sh index 72e120f2ce..312cef86d6 100755 --- a/cpp/scripts/start_server.sh +++ b/cpp/scripts/start_server.sh @@ -1,4 +1,4 @@ #!/bin/bash -../bin/milvus_grpc_server -c ../conf/server_config.yaml -l ../conf/log_config.conf +../bin/milvus_server -c ../conf/server_config.yaml -l ../conf/log_config.conf diff --git a/cpp/start_server.sh b/cpp/start_server.sh index e215242471..50bf8b84a8 100755 --- a/cpp/start_server.sh +++ b/cpp/start_server.sh @@ -1,4 +1,4 @@ #!/bin/bash -./cmake_build/src/milvus_grpc_server -c ./conf/server_config.yaml -l ./conf/log_config.conf & +./cmake_build/src/milvus_server -c ./conf/server_config.yaml -l ./conf/log_config.conf &