From 6797fd0b0b1884320d14345cbdd51d8044a519c6 Mon Sep 17 00:00:00 2001 From: shengjh <1572099106@qq.com> Date: Mon, 28 Sep 2020 12:12:04 +0800 Subject: [PATCH] Update throughput log for proxy Signed-off-by: shengjh <1572099106@qq.com> --- proxy/src/message_client/ClientV2.cpp | 35 +++++++++++++------ .../src/server/delivery/request/InsertReq.cpp | 20 ++++++----- proxy/src/utils/CommonUtil.cpp | 8 +++++ proxy/src/utils/CommonUtil.h | 4 +++ 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index f3317177c3..fba3d35263 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -168,7 +168,11 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, const std::function &segment_id) { + const uint64_t num_records_log = 100 * 10000; + static uint64_t num_inserted = 0; + static uint64_t size_inserted = 0; using stdclock = std::chrono::high_resolution_clock; + static stdclock::duration time_cost; auto start = stdclock::now(); // may have retry policy? auto row_count = request.rows_data_size(); @@ -210,17 +214,26 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, } auto end = stdclock::now(); - auto data_size = request.ByteSize(); - char buff[128]; - auto r = getcwd(buff, 128); - auto path = std::string(buff); - std::ofstream file(path + "/proxy2pulsar.benchmark", std::fstream::app); - file << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " - << "cost" << std::chrono::duration_cast(end - start).count() / 1000.0 << "s, " - << "throughput: " - << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 - / 1024 - << "M/s" << std::endl; + time_cost += (end - start); + num_inserted += row_count; + size_inserted += request.ByteSize(); + if (num_inserted >= num_records_log) { +// char buff[128]; +// auto r = getcwd(buff, 128); + auto path = std::string("/tmp"); + std::ofstream file(path + "/proxy2pulsar.benchmark", std::fstream::app); + file << "[" << milvus::CommonUtil::TimeToString(start) << "]" + << " Insert " << num_inserted << " records, " + << "size:" << size_inserted / 1024.0 / 1024.0 << "M, " + << "cost" << std::chrono::duration_cast(time_cost).count() / 1000.0 << "s, " + << "throughput: " + << double(size_inserted) / std::chrono::duration_cast(time_cost).count() * 1000 / 1024.0 + / 1024 + << "M/s" << std::endl; + time_cost = stdclock::duration(0); + num_inserted = 0; + size_inserted = 0; + } for (auto &stat : stats) { if (!stat.ok()) { diff --git a/proxy/src/server/delivery/request/InsertReq.cpp b/proxy/src/server/delivery/request/InsertReq.cpp index fce0ffd5e2..fb15e6e8d8 100644 --- a/proxy/src/server/delivery/request/InsertReq.cpp +++ b/proxy/src/server/delivery/request/InsertReq.cpp @@ -24,6 +24,7 @@ #include #include #include +#include "utils/CommonUtil.h" #ifdef ENABLE_CPU_PROFILING #include @@ -60,9 +61,9 @@ InsertReq::OnExecute() { static int log_flag = 0; static bool shouldBenchmark = false; static std::stringstream log; - char buff[128]; - auto r = getcwd(buff, 128); - auto path = std::string(buff); +// char buff[128]; +// auto r = getcwd(buff, 128); + auto path = std::string("/tmp"); std::ofstream file(path + "/proxy.benchmark", std::fstream::app); #endif @@ -89,16 +90,17 @@ InsertReq::OnExecute() { #ifdef BENCHMARK inserted_count += insert_param_->rows_data_size(); inserted_size += insert_param_->ByteSize(); - if (shouldBenchmark && inserted_count > 1000) { + if (shouldBenchmark) { end = stdclock::now(); ready_log_records += inserted_count; auto duration = std::chrono::duration_cast(end - start).count() / 1000.0; if (duration > interval) { - log << "===================>Insert: " - << inserted_count << "records," - << " size: " << inserted_size / MB << "MB" - << " cost: " << duration << "s," - << " throughput: " + log << "[" << milvus::CommonUtil::TimeToString(start) << "] " + << "Insert " + << inserted_count << " records, " + << "size: " << inserted_size / MB << "MB, " + << "cost: " << duration << "s, " + << "throughput: " << double(inserted_size) / duration / MB << "M/s\n"; auto new_flag = ready_log_records / per_log_records; diff --git a/proxy/src/utils/CommonUtil.cpp b/proxy/src/utils/CommonUtil.cpp index 296a2d3078..4d4e394fc3 100644 --- a/proxy/src/utils/CommonUtil.cpp +++ b/proxy/src/utils/CommonUtil.cpp @@ -175,6 +175,14 @@ CommonUtil::TimeStrToTime(const std::string& time_str, time_t& time_integer, tm& return true; } +std::string CommonUtil::TimeToString(std::chrono::high_resolution_clock::time_point t) { + std::time_t tt = std::chrono::system_clock::to_time_t(t); + + char buf[100] = {0}; + std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&tt)); + return std::string(buf); +} + void CommonUtil::ConvertTime(time_t time_integer, tm& time_struct) { localtime_r(&time_integer, &time_struct); diff --git a/proxy/src/utils/CommonUtil.h b/proxy/src/utils/CommonUtil.h index ded802379d..2b34d773ff 100644 --- a/proxy/src/utils/CommonUtil.h +++ b/proxy/src/utils/CommonUtil.h @@ -15,6 +15,7 @@ #include #include +#include namespace milvus { @@ -40,6 +41,9 @@ class CommonUtil { TimeStrToTime(const std::string& time_str, time_t& time_integer, tm& time_struct, const std::string& format = "%d-%d-%d %d:%d:%d"); + static std::string + TimeToString(std::chrono::high_resolution_clock::time_point t); + static void ConvertTime(time_t time_integer, tm& time_struct); static void