enhance: add more metrics for async cgo component (#40232)

issue: #40014
pr: #40136

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-03-05 09:16:00 +08:00 committed by GitHub
parent f02e549840
commit 34f2bc0a68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 189 additions and 5 deletions

View File

@ -12,6 +12,7 @@
#include <chrono> #include <chrono>
#include "Executor.h" #include "Executor.h"
#include "common/Common.h" #include "common/Common.h"
#include "monitor/prometheus_client.h"
namespace milvus::futures { namespace milvus::futures {
@ -19,8 +20,9 @@ const int kNumPriority = 3;
folly::CPUThreadPoolExecutor* folly::CPUThreadPoolExecutor*
getGlobalCPUExecutor() { getGlobalCPUExecutor() {
auto thread_num = std::thread::hardware_concurrency();
static folly::CPUThreadPoolExecutor executor( static folly::CPUThreadPoolExecutor executor(
std::thread::hardware_concurrency(), thread_num,
folly::CPUThreadPoolExecutor::makeDefaultPriorityQueue(kNumPriority), folly::CPUThreadPoolExecutor::makeDefaultPriorityQueue(kNumPriority),
std::make_shared<folly::NamedThreadFactory>("MILVUS_FUTURE_CPU_")); std::make_shared<folly::NamedThreadFactory>("MILVUS_FUTURE_CPU_"));
return &executor; return &executor;

View File

@ -19,9 +19,106 @@
#include "future_c_types.h" #include "future_c_types.h"
#include "LeakyResult.h" #include "LeakyResult.h"
#include "Ready.h" #include "Ready.h"
#include "pb/cgo_msg.pb.h"
#include <chrono>
#include "monitor/prometheus_client.h"
namespace milvus::futures { namespace milvus::futures {
template <class Duration>
class Metrics;
template <class Duration>
class Metrics {
public:
class ExecutionGuard {
public:
explicit ExecutionGuard(Metrics& metrics) : metrics_(metrics) {
metrics.startExecute();
}
ExecutionGuard(const ExecutionGuard&) = delete;
ExecutionGuard(const ExecutionGuard&&) = delete;
ExecutionGuard&
operator=(const ExecutionGuard&) = delete;
ExecutionGuard&
operator=(const ExecutionGuard&&) = delete;
~ExecutionGuard() {
metrics_.executeDone();
}
private:
Metrics& metrics_;
};
explicit Metrics()
: time_point_(std::chrono::steady_clock::now()),
queue_duration_(0),
execute_duration_(0),
cancelled_before_execute_(false) {
milvus::monitor::internal_cgo_inflight_task_total_all.Increment();
}
Metrics(const Metrics&) = delete;
Metrics(const Metrics&&) = delete;
Metrics&
operator=(const Metrics&) = delete;
Metrics&
operator=(const Metrics&&) = delete;
~Metrics() {
milvus::monitor::internal_cgo_inflight_task_total_all.Decrement();
milvus::monitor::internal_cgo_queue_duration_seconds_all.Observe(
std::chrono::duration<double>(queue_duration_).count());
if (cancelled_before_execute_) {
milvus::monitor::internal_cgo_cancel_before_execute_total_all
.Increment();
} else {
milvus::monitor::internal_cgo_executing_task_total_all.Decrement();
milvus::monitor::internal_cgo_execute_duration_seconds_all.Observe(
std::chrono::duration<double>(execute_duration_).count());
}
}
void
withCancel() {
queue_duration_ = std::chrono::duration_cast<Duration>(
std::chrono::steady_clock::now() - time_point_);
cancelled_before_execute_ = true;
}
void
startExecute() {
auto now = std::chrono::steady_clock::now();
queue_duration_ =
std::chrono::duration_cast<Duration>(now - time_point_);
time_point_ = now;
milvus::monitor::internal_cgo_executing_task_total_all.Increment();
}
void
executeDone() {
auto now = std::chrono::steady_clock::now();
execute_duration_ =
std::chrono::duration_cast<Duration>(now - time_point_);
}
private:
std::chrono::steady_clock::time_point time_point_;
Duration queue_duration_;
Duration execute_duration_;
bool cancelled_before_execute_;
};
// FutureResult is a struct that represents the result of the future.
class FutureResult {
public:
void* result;
CStatus status;
Metrics<std::chrono::microseconds> metrics;
};
/// @brief a virtual class that represents a future can be polymorphic called by CGO code. /// @brief a virtual class that represents a future can be polymorphic called by CGO code.
/// implemented by Future template. /// implemented by Future template.
class IFuture { class IFuture {
@ -103,7 +200,8 @@ class Future : public IFuture {
/// use `async`. /// use `async`.
Future() Future()
: ready_(std::make_shared<Ready<LeakyResult<R>>>()), : metrics_(),
ready_(std::make_shared<Ready<LeakyResult<R>>>()),
promise_(std::make_shared<folly::SharedPromise<R*>>()), promise_(std::make_shared<folly::SharedPromise<R*>>()),
cancellation_source_() { cancellation_source_() {
} }
@ -176,8 +274,15 @@ class Future : public IFuture {
auto cancellation_token = auto cancellation_token =
CancellationToken(cancellation_source_.getToken()); CancellationToken(cancellation_source_.getToken());
auto runner = [fn = std::forward<Fn>(fn), auto runner = [fn = std::forward<Fn>(fn),
cancellation_token = std::move(cancellation_token)]() { cancellation_token = std::move(cancellation_token),
cancellation_token.throwIfCancelled(); this]() {
if (cancellation_token.isCancellationRequested()) {
metrics_.withCancel();
throw folly::FutureCancellation();
}
auto executionGuard =
Metrics<std::chrono::microseconds>::ExecutionGuard(metrics_);
return fn(cancellation_token); return fn(cancellation_token);
}; };
@ -220,9 +325,9 @@ class Future : public IFuture {
} }
private: private:
Metrics<std::chrono::microseconds> metrics_;
std::shared_ptr<Ready<LeakyResult<R>>> ready_; std::shared_ptr<Ready<LeakyResult<R>>> ready_;
std::shared_ptr<folly::SharedPromise<R*>> promise_; std::shared_ptr<folly::SharedPromise<R*>> promise_;
folly::CancellationSource cancellation_source_; folly::CancellationSource cancellation_source_;
}; };
}; // namespace milvus::futures }; // namespace milvus::futures

View File

@ -16,6 +16,7 @@
#include "Future.h" #include "Future.h"
#include "Executor.h" #include "Executor.h"
#include "log/Log.h" #include "log/Log.h"
#include "monitor/prometheus_client.h"
extern "C" void extern "C" void
future_cancel(CFuture* future) { future_cancel(CFuture* future) {
@ -55,6 +56,7 @@ future_destroy(CFuture* future) {
extern "C" void extern "C" void
executor_set_thread_num(int thread_num) { executor_set_thread_num(int thread_num) {
milvus::futures::getGlobalCPUExecutor()->setNumThreads(thread_num); milvus::futures::getGlobalCPUExecutor()->setNumThreads(thread_num);
milvus::monitor::internal_cgo_pool_size_all.Set(thread_num);
LOG_INFO("future executor setup cpu executor with thread num: {}", LOG_INFO("future executor setup cpu executor with thread num: {}",
thread_num); thread_num);
} }

View File

@ -10,9 +10,30 @@
// or implied. See the License for the specific language governing permissions and limitations under the License. // or implied. See the License for the specific language governing permissions and limitations under the License.
#include "prometheus_client.h" #include "prometheus_client.h"
#include <chrono>
namespace milvus::monitor { namespace milvus::monitor {
const prometheus::Histogram::BucketBoundaries secondsBuckets = {
std::chrono::duration<float>(std::chrono::microseconds(10)).count(),
std::chrono::duration<float>(std::chrono::microseconds(50)).count(),
std::chrono::duration<float>(std::chrono::microseconds(100)).count(),
std::chrono::duration<float>(std::chrono::microseconds(250)).count(),
std::chrono::duration<float>(std::chrono::microseconds(500)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(1)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(5)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(10)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(20)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(50)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(100)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(200)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(500)).count(),
std::chrono::duration<float>(std::chrono::seconds(1)).count(),
std::chrono::duration<float>(std::chrono::seconds(2)).count(),
std::chrono::duration<float>(std::chrono::seconds(5)).count(),
std::chrono::duration<float>(std::chrono::seconds(10)).count(),
};
const prometheus::Histogram::BucketBoundaries buckets = {1, const prometheus::Histogram::BucketBoundaries buckets = {1,
2, 2,
4, 4,
@ -255,4 +276,44 @@ DEFINE_PROMETHEUS_GAUGE(internal_mmap_in_used_count_anon,
DEFINE_PROMETHEUS_GAUGE(internal_mmap_in_used_count_file, DEFINE_PROMETHEUS_GAUGE(internal_mmap_in_used_count_file,
internal_mmap_in_used_count, internal_mmap_in_used_count,
mmapAllocatedCountFileLabel) mmapAllocatedCountFileLabel)
// async cgo metrics
DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_queue_duration_seconds,
"[cpp]async cgo queue duration");
DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS(
internal_cgo_queue_duration_seconds_all,
internal_cgo_queue_duration_seconds,
{},
secondsBuckets);
DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_execute_duration_seconds,
"[cpp]async execute duration");
DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS(
internal_cgo_execute_duration_seconds_all,
internal_cgo_execute_duration_seconds,
{},
secondsBuckets);
DEFINE_PROMETHEUS_COUNTER_FAMILY(internal_cgo_cancel_before_execute_total,
"[cpp]async cgo cancel before execute count");
DEFINE_PROMETHEUS_COUNTER(internal_cgo_cancel_before_execute_total_all,
internal_cgo_cancel_before_execute_total,
{});
DEFINE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_pool_size,
"[cpp]async cgo pool size");
DEFINE_PROMETHEUS_GAUGE(internal_cgo_pool_size_all, internal_cgo_pool_size, {});
DEFINE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_inflight_task_total,
"[cpp]async cgo inflight task");
DEFINE_PROMETHEUS_GAUGE(internal_cgo_inflight_task_total_all,
internal_cgo_inflight_task_total,
{});
DEFINE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_executing_task_total,
"[cpp]async cgo executing task");
DEFINE_PROMETHEUS_GAUGE(internal_cgo_executing_task_total_all,
internal_cgo_executing_task_total,
{});
} // namespace milvus::monitor } // namespace milvus::monitor

View File

@ -142,4 +142,18 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_retrieve_get_target_entry_latency); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_retrieve_get_target_entry_latency);
DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_get_target_entry_latency); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_get_target_entry_latency);
// async cgo metrics
DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_queue_duration_seconds);
DECLARE_PROMETHEUS_HISTOGRAM(internal_cgo_queue_duration_seconds_all);
DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_execute_duration_seconds);
DECLARE_PROMETHEUS_HISTOGRAM(internal_cgo_execute_duration_seconds_all);
DECLARE_PROMETHEUS_COUNTER_FAMILY(internal_cgo_cancel_before_execute_total)
DECLARE_PROMETHEUS_COUNTER(internal_cgo_cancel_before_execute_total_all);
DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_pool_size);
DECLARE_PROMETHEUS_GAUGE(internal_cgo_pool_size_all);
DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_inflight_task_total);
DECLARE_PROMETHEUS_GAUGE(internal_cgo_inflight_task_total_all);
DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_executing_task_total);
DECLARE_PROMETHEUS_GAUGE(internal_cgo_executing_task_total_all);
} // namespace milvus::monitor } // namespace milvus::monitor