// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. #include "server/grpc_impl/GrpcServer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "GrpcRequestHandler.h" #include "config/ServerConfig.h" // #include "server/DBWrapper.h" #include "server/grpc_impl/interceptor/SpanInterceptor.h" #include "utils/Log.h" #include "message_client/ClientV2.h" #include "server/timesync/TimeSync.h" #include "server/delivery/ReqScheduler.h" namespace milvus { namespace server { namespace grpc { constexpr int64_t MESSAGE_SIZE = -1; // this class is to check port occupation during server start class NoReusePortOption : public ::grpc::ServerBuilderOption { public: void UpdateArguments(::grpc::ChannelArguments* args) override { args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0); int grpc_concurrency = 4 * std::thread::hardware_concurrency(); grpc_concurrency = std::max(32, grpc_concurrency); grpc_concurrency = std::min(256, grpc_concurrency); args->SetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS, grpc_concurrency); } void UpdatePlugins(std::vector>* plugins) override { } }; void GrpcServer::Start() { thread_ptr_ = std::make_shared(&GrpcServer::StartService, this); } void GrpcServer::Stop() { StopService(); if (thread_ptr_) { thread_ptr_->join(); thread_ptr_ = nullptr; } } Status GrpcServer::StartService() { SetThreadName("grpcserv_thread"); std::string server_address(config.network.address() + ":" + std::to_string(config.network.port())); ::grpc::ServerBuilder builder; builder.SetOption(std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption)); builder.SetMaxReceiveMessageSize(MESSAGE_SIZE); // default 4 * 1024 * 1024 builder.SetMaxSendMessageSize(MESSAGE_SIZE); builder.SetCompressionAlgorithmSupportStatus(GRPC_COMPRESS_STREAM_GZIP, true); builder.SetDefaultCompressionAlgorithm(GRPC_COMPRESS_STREAM_GZIP); builder.SetDefaultCompressionLevel(GRPC_COMPRESS_LEVEL_NONE); GrpcRequestHandler service(opentracing::Tracer::Global()); service.RegisterRequestHandler(ReqHandler()); builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials()); builder.RegisterService(&service); // timeSync // client id should same to MessageWrapper int client_id = 0; std::string pulsar_server_addr (std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port())); timesync::TimeSync syc(client_id,GetMessageTimeSyncTime, 20, pulsar_server_addr, "TimeSync"); // Add gRPC interceptor using InterceptorI = ::grpc::experimental::ServerInterceptorFactoryInterface; using InterceptorIPtr = std::unique_ptr; std::vector creators; creators.push_back( std::unique_ptr<::grpc::experimental::ServerInterceptorFactoryInterface>(new SpanInterceptorFactory(&service))); builder.experimental().SetInterceptorCreators(std::move(creators)); server_ptr_ = builder.BuildAndStart(); server_ptr_->Wait(); return Status::OK(); } Status GrpcServer::StopService() { if (server_ptr_ != nullptr) { server_ptr_->Shutdown(); } return Status::OK(); } } // namespace grpc } // namespace server } // namespace milvus