Use async send in pulsar producer

Signed-off-by: shengjh <1572099106@qq.com>
This commit is contained in:
shengjh 2020-09-21 19:28:42 +08:00 committed by yefu.chen
parent b583f990e4
commit 70f5a44f8f
16 changed files with 175 additions and 63 deletions

2
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/go-kit/kit v0.10.0 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.4 // indirect
github.com/golang/protobuf v1.3.2
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0
github.com/google/martian/v3 v3.0.0 // indirect
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 // indirect

View File

@ -161,8 +161,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
// may have retry policy?
auto row_count = request.rows_data_size();
auto stats = std::vector<Status>(ParallelNum);
std::atomic_uint64_t msg_sended = 0;
#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id), num_threads(ParallelNum)
#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended), num_threads(ParallelNum)
for (auto i = 0; i < row_count; i++) {
milvus::grpc::InsertOrDeleteMsg mut_msg;
int this_thread = omp_get_thread_num();
@ -173,22 +174,28 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_partition_tag(request.partition_tag());
uint64_t uid = request.entity_id_array(i);
auto channel_id = makeHash(&uid, sizeof(uint64_t));
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % 1024;
try {
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
printf("%ld \n", mut_msg.segment_id());
mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i));
mut_msg.mutable_extra_params()->CopyFrom(request.extra_params());
auto result = paralle_mut_producers_[this_thread]->send(mut_msg);
if (result != pulsar::ResultOk) {
stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result));
}
auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){
msg_sended += 1;
if (result != pulsar::ResultOk) {
stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result));
}
};
paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback);
}
catch (const std::exception &e) {
stats[this_thread] = Status(DB_ERROR, "Meta error");
stats[this_thread] = Status(DB_ERROR, e.what());
}
}
while (msg_sended < row_count){
}
for (auto &stat : stats) {
if (!stat.ok()) {
return stat;
@ -202,9 +209,12 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
const std::function<uint64_t(const std::string &collection_name,
uint64_t channel_id,
uint64_t timestamp)> &segment_id) {
auto stats = std::vector<pulsar::Result>(ParallelNum);
#pragma omp parallel for default(none), shared( request, timestamp, stats, segment_id), num_threads(ParallelNum)
for (auto i = 0; i < request.id_array_size(); i++) {
auto row_count = request.id_array_size();
auto stats = std::vector<Status>(ParallelNum);
std::atomic_uint64_t msg_sended = 0;
#pragma omp parallel for default(none), shared( request, timestamp, stats, segment_id, msg_sended, row_count), num_threads(ParallelNum)
for (auto i = 0; i < row_count; i++) {
milvus::grpc::InsertOrDeleteMsg mut_msg;
mut_msg.set_op(milvus::grpc::OpType::DELETE);
mut_msg.set_client_id(client_id_);
@ -212,18 +222,24 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_timestamp(timestamp);
uint64_t uid = request.id_array(i);
auto channel_id = makeHash(&uid, sizeof(uint64_t));
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % 1024;
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
int this_thread = omp_get_thread_num();
auto result = paralle_mut_producers_[this_thread]->send(mut_msg);
if (result != pulsar::ResultOk) {
stats[this_thread] = result;
}
auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){
msg_sended += 1;
if (result != pulsar::ResultOk) {
stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result));
}
};
paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback);
}
while (msg_sended < row_count){
}
for (auto &stat : stats) {
if (stat != pulsar::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(stat));
if (!stat.ok()) {
return stat;
}
}
return Status::OK();

View File

@ -19,6 +19,10 @@ namespace message_client {
return producer_.send(msg);
}
void MsgProducer::sendAsync(const Message &msg, pulsar::SendCallback callback) {
return producer_.sendAsync(msg, callback);
}
Result MsgProducer::send(const std::string &msg) {
auto pulsar_msg = pulsar::MessageBuilder().setContent(msg).build();
return send(pulsar_msg);
@ -32,6 +36,14 @@ namespace message_client {
return send(pulsar_msg);
}
void MsgProducer::sendAsync(const std::string &msg, int64_t partitioned_key, pulsar::SendCallback callback) {
auto pulsar_msg = pulsar::MessageBuilder().
setContent(msg).
setPartitionKey(std::to_string(partitioned_key)).
build();
return sendAsync(pulsar_msg, callback);
}
Result MsgProducer::send(milvus::grpc::InsertOrDeleteMsg &msg) {
int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024;
// std::cout << "partition id := " << channel_id <<std::endl;
@ -40,6 +52,13 @@ namespace message_client {
return send(msg_str, msg.uid());
}
void MsgProducer::sendAsync(milvus::grpc::InsertOrDeleteMsg &msg, pulsar::SendCallback callback) {
int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024;
msg.set_channel_id(channel_id);
auto msg_str = msg.SerializeAsString();
return sendAsync(msg_str, msg.uid(), callback);
}
Result MsgProducer::send(milvus::grpc::SearchMsg &msg) {
auto msg_str = msg.SerializeAsString();
return send(msg_str, msg.uid());

View File

@ -17,9 +17,12 @@ class MsgProducer {
Result createProducer(const std::string &topic);
Result send(const Message &msg);
void sendAsync(const Message &msg, pulsar::SendCallback callback);
Result send(const std::string &msg);
Result send(const std::string &msg, const int64_t partitioned_key);
void sendAsync(const std::string &msg, int64_t partitioned_key, pulsar::SendCallback callback);
Result send(milvus::grpc::InsertOrDeleteMsg &msg);
void sendAsync(milvus::grpc::InsertOrDeleteMsg &msg, pulsar::SendCallback callback);
Result send(milvus::grpc::SearchMsg &msg);
// Result send(milvus::grpc::EntityIdentity &msg);
Result send(milvus::grpc::TimeSyncMsg & msg);

View File

@ -21,7 +21,24 @@ Status GrpcClient::CreateCollection(const milvus::grpc::Mapping &mapping) {
if (!grpc_status.ok()) {
std::cerr << "CreateHybridCollection gRPC failed!" << grpc_status.error_message() << std::endl;
return Status(grpc_status.error_code(), grpc_status.error_message());
return Status(grpc_status.error_code(), "CreateHybridCollection gRPC failed!" + grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {
// TODO: LOG
return Status(response.error_code(), response.reason());
}
return Status::OK();
}
Status GrpcClient::CreateIndex(const milvus::grpc::IndexParam& request) {
ClientContext context;
::milvus::grpc::Status response;
::grpc::Status grpc_status = stub_->CreateIndex(&context, request, &response);
if (!grpc_status.ok()) {
std::cerr << "CreateIndex gRPC failed!" << grpc_status.error_message() << std::endl;
return Status(grpc_status.error_code(), "CreateIndex gRPC failed!" + grpc_status.error_message());
}
if (response.error_code() != grpc::SUCCESS) {

View File

@ -16,6 +16,9 @@ class GrpcClient {
Status
CreateCollection(const milvus::grpc::Mapping& mapping);
Status
CreateIndex(const milvus::grpc::IndexParam& request);
private:
std::unique_ptr<masterpb::Master::Stub> stub_;

View File

@ -49,23 +49,28 @@ MetaWrapper &MetaWrapper::GetInstance() {
}
Status MetaWrapper::Init() {
etcd_root_path_ = config.etcd.rootpath();
segment_path_ = etcd_root_path_ + "segment/";
collection_path_ = etcd_root_path_ + "collection/";
try {
etcd_root_path_ = config.etcd.rootpath();
segment_path_ = etcd_root_path_ + "segment/";
collection_path_ = etcd_root_path_ + "collection/";
auto master_addr = config.master.address() + ":" + std::to_string(config.master.port());
master_client_ = std::make_shared<milvus::master::GrpcClient>(master_addr);
auto master_addr = config.master.address() + ":" + std::to_string(config.master.port());
master_client_ = std::make_shared<milvus::master::GrpcClient>(master_addr);
auto etcd_addr = config.etcd.address() + ":" + std::to_string(config.etcd.port());
etcd_client_ = std::make_shared<milvus::master::EtcdClient>(etcd_addr);
auto etcd_addr = config.etcd.address() + ":" + std::to_string(config.etcd.port());
etcd_client_ = std::make_shared<milvus::master::EtcdClient>(etcd_addr);
// init etcd watcher
auto f = [&](const etcdserverpb::WatchResponse &res) {
UpdateMeta(res);
};
watcher_ = std::make_shared<milvus::master::Watcher>(etcd_addr, segment_path_, f, true);
// init etcd watcher
auto f = [&](const etcdserverpb::WatchResponse &res) {
UpdateMeta(res);
};
watcher_ = std::make_shared<milvus::master::Watcher>(etcd_addr, segment_path_, f, true);
SyncMeta();
SyncMeta();
}
catch (const std::exception &e) {
return Status(DB_ERROR, "Init meta error");
}
}
std::shared_ptr<milvus::master::GrpcClient> MetaWrapper::MetaClient() {
@ -113,6 +118,7 @@ uint64_t MetaWrapper::AskSegmentId(const std::string &collection_name, uint64_t
&& segment_info.collection_name() == collection_name) {
return segment_info.segment_id();
}
return 0;
}
throw std::runtime_error("Can't find eligible segment");
}

View File

@ -29,6 +29,8 @@
#include "server/init/StorageChecker.h"
#include "src/version.h"
#include <yaml-cpp/yaml.h>
#include <src/server/timesync/TimeSync.h>
#include <src/server/delivery/ReqScheduler.h>
#include "message_client/ClientV2.h"
#include "utils/Log.h"
#include "utils/SignalHandler.h"
@ -233,14 +235,20 @@ Server::Stop() {
Status
Server::StartService() {
Status stat;
// timeSync
// client id should same to MessageWrapper
int client_id = 0;
std::string pulsar_server_addr
(std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
static timesync::TimeSync syc(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync");
// stat = engine::KnowhereResource::Initialize();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "KnowhereResource initialize fail: " << stat.message();
goto FAIL;
}
grpc::GrpcServer::GetInstance().Start();
// Init pulsar message client
stat = MessageWrapper::GetInstance().Init();
if (!stat.ok()) {
@ -248,7 +256,13 @@ Server::StartService() {
goto FAIL;
}
MetaWrapper::GetInstance().Init();
stat = MetaWrapper::GetInstance().Init();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "Meta start service fail: " << stat.message();
goto FAIL;
}
grpc::GrpcServer::GetInstance().Start();
return Status::OK();
FAIL:

View File

@ -123,9 +123,8 @@ ReqHandler::ListPartitions(const ContextPtr& context, const std::string& collect
}
Status
ReqHandler::CreateIndex(const ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const std::string& index_name, const milvus::json& json_params) {
BaseReqPtr req_ptr = CreateIndexReq::Create(context, collection_name, field_name, index_name, json_params);
ReqHandler::CreateIndex(const ContextPtr& context, const ::milvus::grpc::IndexParam *request) {
BaseReqPtr req_ptr = CreateIndexReq::Create(context, request);
ReqScheduler::ExecReq(req_ptr);
return req_ptr->status();
}

View File

@ -65,8 +65,7 @@ class ReqHandler {
ListPartitions(const ContextPtr& context, const std::string& collection_name, std::vector<std::string>& partitions);
Status
CreateIndex(const ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const std::string& index_name, const milvus::json& json_params);
CreateIndex(const ContextPtr& context, const ::milvus::grpc::IndexParam *request);
Status
DescribeIndex(const ContextPtr& context, const std::string& collection_name, const std::string& field_name,

View File

@ -18,6 +18,7 @@
#include <string>
#include <unordered_map>
#include <vector>
#include "server/MetaWrapper.h"
namespace milvus {
namespace server {
@ -32,16 +33,28 @@ CreateIndexReq::CreateIndexReq(const ContextPtr& context, const std::string& col
json_params_(json_params) {
}
CreateIndexReq::CreateIndexReq(const ContextPtr& context, const ::milvus::grpc::IndexParam *request)
: BaseReq(context, ReqType::kCreateIndex),
request_(request){}
BaseReqPtr
CreateIndexReq::Create(const ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const std::string& index_name, const milvus::json& json_params) {
return std::shared_ptr<BaseReq>(new CreateIndexReq(context, collection_name, field_name, index_name, json_params));
}
BaseReqPtr
CreateIndexReq::Create(const ContextPtr& context, const ::milvus::grpc::IndexParam *request){
return std::shared_ptr<BaseReq>(new CreateIndexReq(context, request));
}
Status
CreateIndexReq::OnExecute() {
return Status::OK();
auto status = MetaWrapper::GetInstance().MetaClient()->CreateIndex(*request_);
if (status.ok()){
status = MetaWrapper::GetInstance().SyncMeta();
}
return status;
}

View File

@ -24,10 +24,15 @@ class CreateIndexReq : public BaseReq {
Create(const ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const std::string& index_name, const milvus::json& json_params);
static BaseReqPtr
Create(const ContextPtr& context, const ::milvus::grpc::IndexParam *request);
protected:
CreateIndexReq(const ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const std::string& index_name, const milvus::json& json_params);
CreateIndexReq(const ContextPtr& context, const ::milvus::grpc::IndexParam *request);
Status
OnExecute() override;
@ -36,6 +41,8 @@ class CreateIndexReq : public BaseReq {
const std::string field_name_;
const std::string index_name_;
const milvus::json json_params_;
const ::milvus::grpc::IndexParam *request_;
};
} // namespace server

View File

@ -384,18 +384,17 @@ GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, const ::milvus::
CHECK_NULLPTR_RETURN(request)
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
milvus::json json_params;
for (int i = 0; i < request->extra_params_size(); i++) {
const ::milvus::grpc::KeyValuePair &extra = request->extra_params(i);
if (extra.key() == EXTRA_PARAM_KEY) {
json_params[EXTRA_PARAM_KEY] = json::parse(extra.value());
} else {
json_params[extra.key()] = extra.value();
}
}
// milvus::json json_params;
// for (int i = 0; i < request->extra_params_size(); i++) {
// const ::milvus::grpc::KeyValuePair &extra = request->extra_params(i);
// if (extra.key() == EXTRA_PARAM_KEY) {
// json_params[EXTRA_PARAM_KEY] = json::parse(extra.value());
// } else {
// json_params[extra.key()] = extra.value();
// }
// }
Status status = req_handler_.CreateIndex(GetContext(context), request->collection_name(), request->field_name(),
request->index_name(), json_params);
Status status = req_handler_.CreateIndex(GetContext(context), request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context);

View File

@ -99,15 +99,6 @@ GrpcServer::StartService() {
builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials());
builder.RegisterService(&service);
// timeSync
// client id should same to MessageWrapper
int client_id = 0;
std::string pulsar_server_addr
(std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
timesync::TimeSync syc(client_id,GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync");
// Add gRPC interceptor
using InterceptorI = ::grpc::experimental::ServerInterceptorFactoryInterface;
using InterceptorIPtr = std::unique_ptr<InterceptorI>;

View File

@ -64,7 +64,7 @@ add_custom_target(generate_suvlim_pb_grpc ALL DEPENDS protoc grpc_cpp_plugin)
add_custom_command(TARGET generate_suvlim_pb_grpc
POST_BUILD
COMMAND echo "${PROTOC_EXCUTABLE}"
# COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_cpp.sh" -p "${PROTOC_EXCUTABLE}" -g "${GRPC_CPP_PLUGIN_EXCUTABLE}"
COMMAND ${PROTOC_EXCUTABLE} -I "${PROTO_PATH}/proto" --grpc_out "${PROTO_PATH}" --cpp_out "${PROTO_PATH}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}"

View File

@ -0,0 +1,26 @@
#include <string>
#include "interface/ConnectionImpl.h"
#include "utils/Utils.h"
const std::string COLLECTION = "collection_0";
int main(int argc, char *argv[]) {
TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv);
if (!parameters.is_valid){
return 0;
}
auto client = milvus::ConnectionImpl();
milvus::ConnectParam connect_param;
connect_param.ip_address = parameters.address_.empty() ? "127.0.0.1":parameters.address_;
connect_param.port = parameters.port_.empty() ? "19530":parameters.port_ ;
client.Connect(connect_param);
JSON json_params = {{"index_type", "IVF_FLAT"}, {"metric_type", "L2"}, {"params", {{"nlist", 100}}}};
milvus::IndexParam index1 = {COLLECTION, "field_vec", json_params.dump()};
milvus_sdk::Utils::PrintIndexParam(index1);
milvus::Status stat = client.CreateIndex(index1);
std::cout << "CreateIndex function call status: " << stat.message() << std::endl;
return 0;
}