From 70f5a44f8ffeeae42d47db711b7558748e1548d6 Mon Sep 17 00:00:00 2001 From: shengjh <1572099106@qq.com> Date: Mon, 21 Sep 2020 19:28:42 +0800 Subject: [PATCH] Use async send in pulsar producer Signed-off-by: shengjh <1572099106@qq.com> --- go.mod | 2 +- proxy/src/message_client/ClientV2.cpp | 50 ++++++++++++------- proxy/src/message_client/Producer.cpp | 19 +++++++ proxy/src/message_client/Producer.h | 3 ++ proxy/src/meta/master/GrpcClient.cpp | 19 ++++++- proxy/src/meta/master/GrpcClient.h | 3 ++ proxy/src/server/MetaWrapper.cpp | 32 +++++++----- proxy/src/server/Server.cpp | 20 ++++++-- proxy/src/server/delivery/ReqHandler.cpp | 5 +- proxy/src/server/delivery/ReqHandler.h | 3 +- .../delivery/request/CreateIndexReq.cpp | 17 ++++++- .../server/delivery/request/CreateIndexReq.h | 7 +++ .../server/grpc_impl/GrpcRequestHandler.cpp | 21 ++++---- proxy/src/server/grpc_impl/GrpcServer.cpp | 9 ---- proxy/thirdparty/grpc/CMakeLists.txt | 2 +- sdk/examples/simple/CreateIndex.cpp | 26 ++++++++++ 16 files changed, 175 insertions(+), 63 deletions(-) create mode 100644 sdk/examples/simple/CreateIndex.cpp diff --git a/go.mod b/go.mod index 253dded8b8..42201b57b7 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/go-kit/kit v0.10.0 // indirect github.com/gogo/protobuf v1.3.1 github.com/golang/mock v1.4.4 // indirect - github.com/golang/protobuf v1.3.2 + github.com/golang/protobuf v1.4.2 github.com/google/btree v1.0.0 github.com/google/martian/v3 v3.0.0 // indirect github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 // indirect diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index 826c213231..e4f38586f3 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -161,8 +161,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, // may have retry policy? auto row_count = request.rows_data_size(); auto stats = std::vector(ParallelNum); + std::atomic_uint64_t msg_sended = 0; -#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id), num_threads(ParallelNum) +#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended), num_threads(ParallelNum) for (auto i = 0; i < row_count; i++) { milvus::grpc::InsertOrDeleteMsg mut_msg; int this_thread = omp_get_thread_num(); @@ -173,22 +174,28 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, mut_msg.set_collection_name(request.collection_name()); mut_msg.set_partition_tag(request.partition_tag()); uint64_t uid = request.entity_id_array(i); - auto channel_id = makeHash(&uid, sizeof(uint64_t)); + auto channel_id = makeHash(&uid, sizeof(uint64_t)) % 1024; try { mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); printf("%ld \n", mut_msg.segment_id()); mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); - auto result = paralle_mut_producers_[this_thread]->send(mut_msg); - if (result != pulsar::ResultOk) { - stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); - } + auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){ + msg_sended += 1; + if (result != pulsar::ResultOk) { + stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); + } + }; + paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback); } catch (const std::exception &e) { - stats[this_thread] = Status(DB_ERROR, "Meta error"); + stats[this_thread] = Status(DB_ERROR, e.what()); } } + while (msg_sended < row_count){ + } + for (auto &stat : stats) { if (!stat.ok()) { return stat; @@ -202,9 +209,12 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, const std::function &segment_id) { - auto stats = std::vector(ParallelNum); -#pragma omp parallel for default(none), shared( request, timestamp, stats, segment_id), num_threads(ParallelNum) - for (auto i = 0; i < request.id_array_size(); i++) { + auto row_count = request.id_array_size(); + auto stats = std::vector(ParallelNum); + std::atomic_uint64_t msg_sended = 0; + +#pragma omp parallel for default(none), shared( request, timestamp, stats, segment_id, msg_sended, row_count), num_threads(ParallelNum) + for (auto i = 0; i < row_count; i++) { milvus::grpc::InsertOrDeleteMsg mut_msg; mut_msg.set_op(milvus::grpc::OpType::DELETE); mut_msg.set_client_id(client_id_); @@ -212,18 +222,24 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, mut_msg.set_collection_name(request.collection_name()); mut_msg.set_timestamp(timestamp); uint64_t uid = request.id_array(i); - auto channel_id = makeHash(&uid, sizeof(uint64_t)); + auto channel_id = makeHash(&uid, sizeof(uint64_t)) % 1024; mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); int this_thread = omp_get_thread_num(); - auto result = paralle_mut_producers_[this_thread]->send(mut_msg); - if (result != pulsar::ResultOk) { - stats[this_thread] = result; - } + auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){ + msg_sended += 1; + if (result != pulsar::ResultOk) { + stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); + } + }; + paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback); } + while (msg_sended < row_count){ + } + for (auto &stat : stats) { - if (stat != pulsar::ResultOk) { - return Status(DB_ERROR, pulsar::strResult(stat)); + if (!stat.ok()) { + return stat; } } return Status::OK(); diff --git a/proxy/src/message_client/Producer.cpp b/proxy/src/message_client/Producer.cpp index 0fbb3a0453..470d0bed18 100644 --- a/proxy/src/message_client/Producer.cpp +++ b/proxy/src/message_client/Producer.cpp @@ -19,6 +19,10 @@ namespace message_client { return producer_.send(msg); } + void MsgProducer::sendAsync(const Message &msg, pulsar::SendCallback callback) { + return producer_.sendAsync(msg, callback); + } + Result MsgProducer::send(const std::string &msg) { auto pulsar_msg = pulsar::MessageBuilder().setContent(msg).build(); return send(pulsar_msg); @@ -32,6 +36,14 @@ namespace message_client { return send(pulsar_msg); } + void MsgProducer::sendAsync(const std::string &msg, int64_t partitioned_key, pulsar::SendCallback callback) { + auto pulsar_msg = pulsar::MessageBuilder(). + setContent(msg). + setPartitionKey(std::to_string(partitioned_key)). + build(); + return sendAsync(pulsar_msg, callback); + } + Result MsgProducer::send(milvus::grpc::InsertOrDeleteMsg &msg) { int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024; // std::cout << "partition id := " << channel_id <CreateIndex(&context, request, &response); + + if (!grpc_status.ok()) { + std::cerr << "CreateIndex gRPC failed!" << grpc_status.error_message() << std::endl; + return Status(grpc_status.error_code(), "CreateIndex gRPC failed!" + grpc_status.error_message()); } if (response.error_code() != grpc::SUCCESS) { diff --git a/proxy/src/meta/master/GrpcClient.h b/proxy/src/meta/master/GrpcClient.h index 45db5a1ed0..bba67b9168 100644 --- a/proxy/src/meta/master/GrpcClient.h +++ b/proxy/src/meta/master/GrpcClient.h @@ -16,6 +16,9 @@ class GrpcClient { Status CreateCollection(const milvus::grpc::Mapping& mapping); + Status + CreateIndex(const milvus::grpc::IndexParam& request); + private: std::unique_ptr stub_; diff --git a/proxy/src/server/MetaWrapper.cpp b/proxy/src/server/MetaWrapper.cpp index 7c1ea3e937..515a058618 100644 --- a/proxy/src/server/MetaWrapper.cpp +++ b/proxy/src/server/MetaWrapper.cpp @@ -49,23 +49,28 @@ MetaWrapper &MetaWrapper::GetInstance() { } Status MetaWrapper::Init() { - etcd_root_path_ = config.etcd.rootpath(); - segment_path_ = etcd_root_path_ + "segment/"; - collection_path_ = etcd_root_path_ + "collection/"; + try { + etcd_root_path_ = config.etcd.rootpath(); + segment_path_ = etcd_root_path_ + "segment/"; + collection_path_ = etcd_root_path_ + "collection/"; - auto master_addr = config.master.address() + ":" + std::to_string(config.master.port()); - master_client_ = std::make_shared(master_addr); + auto master_addr = config.master.address() + ":" + std::to_string(config.master.port()); + master_client_ = std::make_shared(master_addr); - auto etcd_addr = config.etcd.address() + ":" + std::to_string(config.etcd.port()); - etcd_client_ = std::make_shared(etcd_addr); + auto etcd_addr = config.etcd.address() + ":" + std::to_string(config.etcd.port()); + etcd_client_ = std::make_shared(etcd_addr); - // init etcd watcher - auto f = [&](const etcdserverpb::WatchResponse &res) { - UpdateMeta(res); - }; - watcher_ = std::make_shared(etcd_addr, segment_path_, f, true); + // init etcd watcher + auto f = [&](const etcdserverpb::WatchResponse &res) { + UpdateMeta(res); + }; + watcher_ = std::make_shared(etcd_addr, segment_path_, f, true); - SyncMeta(); + SyncMeta(); + } + catch (const std::exception &e) { + return Status(DB_ERROR, "Init meta error"); + } } std::shared_ptr MetaWrapper::MetaClient() { @@ -113,6 +118,7 @@ uint64_t MetaWrapper::AskSegmentId(const std::string &collection_name, uint64_t && segment_info.collection_name() == collection_name) { return segment_info.segment_id(); } + return 0; } throw std::runtime_error("Can't find eligible segment"); } diff --git a/proxy/src/server/Server.cpp b/proxy/src/server/Server.cpp index 026157de29..ce63232da7 100644 --- a/proxy/src/server/Server.cpp +++ b/proxy/src/server/Server.cpp @@ -29,6 +29,8 @@ #include "server/init/StorageChecker.h" #include "src/version.h" #include +#include +#include #include "message_client/ClientV2.h" #include "utils/Log.h" #include "utils/SignalHandler.h" @@ -233,14 +235,20 @@ Server::Stop() { Status Server::StartService() { Status stat; + + // timeSync + // client id should same to MessageWrapper + int client_id = 0; + std::string pulsar_server_addr + (std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port())); + static timesync::TimeSync syc(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync"); + // stat = engine::KnowhereResource::Initialize(); if (!stat.ok()) { LOG_SERVER_ERROR_ << "KnowhereResource initialize fail: " << stat.message(); goto FAIL; } - grpc::GrpcServer::GetInstance().Start(); - // Init pulsar message client stat = MessageWrapper::GetInstance().Init(); if (!stat.ok()) { @@ -248,7 +256,13 @@ Server::StartService() { goto FAIL; } - MetaWrapper::GetInstance().Init(); + stat = MetaWrapper::GetInstance().Init(); + if (!stat.ok()) { + LOG_SERVER_ERROR_ << "Meta start service fail: " << stat.message(); + goto FAIL; + } + + grpc::GrpcServer::GetInstance().Start(); return Status::OK(); FAIL: diff --git a/proxy/src/server/delivery/ReqHandler.cpp b/proxy/src/server/delivery/ReqHandler.cpp index 17b504709f..201e0ff9d2 100644 --- a/proxy/src/server/delivery/ReqHandler.cpp +++ b/proxy/src/server/delivery/ReqHandler.cpp @@ -123,9 +123,8 @@ ReqHandler::ListPartitions(const ContextPtr& context, const std::string& collect } Status -ReqHandler::CreateIndex(const ContextPtr& context, const std::string& collection_name, const std::string& field_name, - const std::string& index_name, const milvus::json& json_params) { - BaseReqPtr req_ptr = CreateIndexReq::Create(context, collection_name, field_name, index_name, json_params); +ReqHandler::CreateIndex(const ContextPtr& context, const ::milvus::grpc::IndexParam *request) { + BaseReqPtr req_ptr = CreateIndexReq::Create(context, request); ReqScheduler::ExecReq(req_ptr); return req_ptr->status(); } diff --git a/proxy/src/server/delivery/ReqHandler.h b/proxy/src/server/delivery/ReqHandler.h index df97bcaf4e..e51b743f0b 100644 --- a/proxy/src/server/delivery/ReqHandler.h +++ b/proxy/src/server/delivery/ReqHandler.h @@ -65,8 +65,7 @@ class ReqHandler { ListPartitions(const ContextPtr& context, const std::string& collection_name, std::vector& partitions); Status - CreateIndex(const ContextPtr& context, const std::string& collection_name, const std::string& field_name, - const std::string& index_name, const milvus::json& json_params); + CreateIndex(const ContextPtr& context, const ::milvus::grpc::IndexParam *request); Status DescribeIndex(const ContextPtr& context, const std::string& collection_name, const std::string& field_name, diff --git a/proxy/src/server/delivery/request/CreateIndexReq.cpp b/proxy/src/server/delivery/request/CreateIndexReq.cpp index b1324e1960..ed6cefc951 100644 --- a/proxy/src/server/delivery/request/CreateIndexReq.cpp +++ b/proxy/src/server/delivery/request/CreateIndexReq.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "server/MetaWrapper.h" namespace milvus { namespace server { @@ -32,16 +33,28 @@ CreateIndexReq::CreateIndexReq(const ContextPtr& context, const std::string& col json_params_(json_params) { } +CreateIndexReq::CreateIndexReq(const ContextPtr& context, const ::milvus::grpc::IndexParam *request) + : BaseReq(context, ReqType::kCreateIndex), + request_(request){} + BaseReqPtr CreateIndexReq::Create(const ContextPtr& context, const std::string& collection_name, const std::string& field_name, const std::string& index_name, const milvus::json& json_params) { return std::shared_ptr(new CreateIndexReq(context, collection_name, field_name, index_name, json_params)); } +BaseReqPtr +CreateIndexReq::Create(const ContextPtr& context, const ::milvus::grpc::IndexParam *request){ + return std::shared_ptr(new CreateIndexReq(context, request)); +} + Status CreateIndexReq::OnExecute() { - - return Status::OK(); + auto status = MetaWrapper::GetInstance().MetaClient()->CreateIndex(*request_); + if (status.ok()){ + status = MetaWrapper::GetInstance().SyncMeta(); + } + return status; } diff --git a/proxy/src/server/delivery/request/CreateIndexReq.h b/proxy/src/server/delivery/request/CreateIndexReq.h index d7f6081a97..a9e367707b 100644 --- a/proxy/src/server/delivery/request/CreateIndexReq.h +++ b/proxy/src/server/delivery/request/CreateIndexReq.h @@ -24,10 +24,15 @@ class CreateIndexReq : public BaseReq { Create(const ContextPtr& context, const std::string& collection_name, const std::string& field_name, const std::string& index_name, const milvus::json& json_params); + static BaseReqPtr + Create(const ContextPtr& context, const ::milvus::grpc::IndexParam *request); + protected: CreateIndexReq(const ContextPtr& context, const std::string& collection_name, const std::string& field_name, const std::string& index_name, const milvus::json& json_params); + CreateIndexReq(const ContextPtr& context, const ::milvus::grpc::IndexParam *request); + Status OnExecute() override; @@ -36,6 +41,8 @@ class CreateIndexReq : public BaseReq { const std::string field_name_; const std::string index_name_; const milvus::json json_params_; + + const ::milvus::grpc::IndexParam *request_; }; } // namespace server diff --git a/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp b/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp index 2fb869a052..2f694c8c92 100644 --- a/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/proxy/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -384,18 +384,17 @@ GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, const ::milvus:: CHECK_NULLPTR_RETURN(request) LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__); - milvus::json json_params; - for (int i = 0; i < request->extra_params_size(); i++) { - const ::milvus::grpc::KeyValuePair &extra = request->extra_params(i); - if (extra.key() == EXTRA_PARAM_KEY) { - json_params[EXTRA_PARAM_KEY] = json::parse(extra.value()); - } else { - json_params[extra.key()] = extra.value(); - } - } +// milvus::json json_params; +// for (int i = 0; i < request->extra_params_size(); i++) { +// const ::milvus::grpc::KeyValuePair &extra = request->extra_params(i); +// if (extra.key() == EXTRA_PARAM_KEY) { +// json_params[EXTRA_PARAM_KEY] = json::parse(extra.value()); +// } else { +// json_params[extra.key()] = extra.value(); +// } +// } - Status status = req_handler_.CreateIndex(GetContext(context), request->collection_name(), request->field_name(), - request->index_name(), json_params); + Status status = req_handler_.CreateIndex(GetContext(context), request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__); SET_RESPONSE(response, status, context); diff --git a/proxy/src/server/grpc_impl/GrpcServer.cpp b/proxy/src/server/grpc_impl/GrpcServer.cpp index 67cba3eb1a..55a8415fd1 100644 --- a/proxy/src/server/grpc_impl/GrpcServer.cpp +++ b/proxy/src/server/grpc_impl/GrpcServer.cpp @@ -99,15 +99,6 @@ GrpcServer::StartService() { builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials()); builder.RegisterService(&service); - - // timeSync - // client id should same to MessageWrapper - int client_id = 0; - std::string pulsar_server_addr - (std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port())); - timesync::TimeSync syc(client_id,GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync"); - - // Add gRPC interceptor using InterceptorI = ::grpc::experimental::ServerInterceptorFactoryInterface; using InterceptorIPtr = std::unique_ptr; diff --git a/proxy/thirdparty/grpc/CMakeLists.txt b/proxy/thirdparty/grpc/CMakeLists.txt index 9bb76a43de..b39512ff57 100644 --- a/proxy/thirdparty/grpc/CMakeLists.txt +++ b/proxy/thirdparty/grpc/CMakeLists.txt @@ -64,7 +64,7 @@ add_custom_target(generate_suvlim_pb_grpc ALL DEPENDS protoc grpc_cpp_plugin) add_custom_command(TARGET generate_suvlim_pb_grpc POST_BUILD COMMAND echo "${PROTOC_EXCUTABLE}" -# COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}" + COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}" COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_cpp.sh" -p "${PROTOC_EXCUTABLE}" -g "${GRPC_CPP_PLUGIN_EXCUTABLE}" COMMAND ${PROTOC_EXCUTABLE} -I "${PROTO_PATH}/proto" --grpc_out "${PROTO_PATH}" --cpp_out "${PROTO_PATH}" --plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}" diff --git a/sdk/examples/simple/CreateIndex.cpp b/sdk/examples/simple/CreateIndex.cpp new file mode 100644 index 0000000000..24a1dbb63e --- /dev/null +++ b/sdk/examples/simple/CreateIndex.cpp @@ -0,0 +1,26 @@ +#include +#include "interface/ConnectionImpl.h" +#include "utils/Utils.h" + +const std::string COLLECTION = "collection_0"; + +int main(int argc, char *argv[]) { + TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); + if (!parameters.is_valid){ + return 0; + } + auto client = milvus::ConnectionImpl(); + milvus::ConnectParam connect_param; + connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_; + connect_param.port = parameters.port_.empty() ? "19530":parameters.port_ ; + + client.Connect(connect_param); + + JSON json_params = {{"index_type", "IVF_FLAT"}, {"metric_type", "L2"}, {"params", {{"nlist", 100}}}}; + milvus::IndexParam index1 = {COLLECTION, "field_vec", json_params.dump()}; + milvus_sdk::Utils::PrintIndexParam(index1); + milvus::Status stat = client.CreateIndex(index1); + std::cout << "CreateIndex function call status: " << stat.message() << std::endl; + + return 0; +} \ No newline at end of file