diff --git a/internal/core/src/futures/Executor.cpp b/internal/core/src/futures/Executor.cpp index b424809e0a..486ff19536 100644 --- a/internal/core/src/futures/Executor.cpp +++ b/internal/core/src/futures/Executor.cpp @@ -12,6 +12,7 @@ #include #include "Executor.h" #include "common/Common.h" +#include "monitor/prometheus_client.h" namespace milvus::futures { @@ -19,8 +20,9 @@ const int kNumPriority = 3; folly::CPUThreadPoolExecutor* getGlobalCPUExecutor() { + auto thread_num = std::thread::hardware_concurrency(); static folly::CPUThreadPoolExecutor executor( - std::thread::hardware_concurrency(), + thread_num, folly::CPUThreadPoolExecutor::makeDefaultPriorityQueue(kNumPriority), std::make_shared("MILVUS_FUTURE_CPU_")); return &executor; diff --git a/internal/core/src/futures/Future.h b/internal/core/src/futures/Future.h index 60eb804e96..3e40ec78ce 100644 --- a/internal/core/src/futures/Future.h +++ b/internal/core/src/futures/Future.h @@ -19,9 +19,106 @@ #include "future_c_types.h" #include "LeakyResult.h" #include "Ready.h" +#include "pb/cgo_msg.pb.h" +#include +#include "monitor/prometheus_client.h" namespace milvus::futures { +template +class Metrics; + +template +class Metrics { + public: + class ExecutionGuard { + public: + explicit ExecutionGuard(Metrics& metrics) : metrics_(metrics) { + metrics.startExecute(); + } + + ExecutionGuard(const ExecutionGuard&) = delete; + ExecutionGuard(const ExecutionGuard&&) = delete; + ExecutionGuard& + operator=(const ExecutionGuard&) = delete; + ExecutionGuard& + operator=(const ExecutionGuard&&) = delete; + + ~ExecutionGuard() { + metrics_.executeDone(); + } + + private: + Metrics& metrics_; + }; + + explicit Metrics() + : time_point_(std::chrono::steady_clock::now()), + queue_duration_(0), + execute_duration_(0), + cancelled_before_execute_(false) { + milvus::monitor::internal_cgo_inflight_task_total_all.Increment(); + } + + Metrics(const Metrics&) = delete; + Metrics(const Metrics&&) = delete; + Metrics& + operator=(const Metrics&) = delete; + Metrics& + operator=(const Metrics&&) = delete; + + ~Metrics() { + milvus::monitor::internal_cgo_inflight_task_total_all.Decrement(); + milvus::monitor::internal_cgo_queue_duration_seconds_all.Observe( + std::chrono::duration(queue_duration_).count()); + if (cancelled_before_execute_) { + milvus::monitor::internal_cgo_cancel_before_execute_total_all + .Increment(); + } else { + milvus::monitor::internal_cgo_executing_task_total_all.Decrement(); + milvus::monitor::internal_cgo_execute_duration_seconds_all.Observe( + std::chrono::duration(execute_duration_).count()); + } + } + + void + withCancel() { + queue_duration_ = std::chrono::duration_cast( + std::chrono::steady_clock::now() - time_point_); + cancelled_before_execute_ = true; + } + + void + startExecute() { + auto now = std::chrono::steady_clock::now(); + queue_duration_ = + std::chrono::duration_cast(now - time_point_); + time_point_ = now; + milvus::monitor::internal_cgo_executing_task_total_all.Increment(); + } + + void + executeDone() { + auto now = std::chrono::steady_clock::now(); + execute_duration_ = + std::chrono::duration_cast(now - time_point_); + } + + private: + std::chrono::steady_clock::time_point time_point_; + Duration queue_duration_; + Duration execute_duration_; + bool cancelled_before_execute_; +}; + +// FutureResult is a struct that represents the result of the future. +class FutureResult { + public: + void* result; + CStatus status; + Metrics metrics; +}; + /// @brief a virtual class that represents a future can be polymorphic called by CGO code. /// implemented by Future template. class IFuture { @@ -103,7 +200,8 @@ class Future : public IFuture { /// use `async`. Future() - : ready_(std::make_shared>>()), + : metrics_(), + ready_(std::make_shared>>()), promise_(std::make_shared>()), cancellation_source_() { } @@ -176,8 +274,15 @@ class Future : public IFuture { auto cancellation_token = CancellationToken(cancellation_source_.getToken()); auto runner = [fn = std::forward(fn), - cancellation_token = std::move(cancellation_token)]() { - cancellation_token.throwIfCancelled(); + cancellation_token = std::move(cancellation_token), + this]() { + if (cancellation_token.isCancellationRequested()) { + metrics_.withCancel(); + throw folly::FutureCancellation(); + } + + auto executionGuard = + Metrics::ExecutionGuard(metrics_); return fn(cancellation_token); }; @@ -220,9 +325,9 @@ class Future : public IFuture { } private: + Metrics metrics_; std::shared_ptr>> ready_; std::shared_ptr> promise_; folly::CancellationSource cancellation_source_; }; - }; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/future_c.cpp b/internal/core/src/futures/future_c.cpp index 1221d2d653..9f246dd328 100644 --- a/internal/core/src/futures/future_c.cpp +++ b/internal/core/src/futures/future_c.cpp @@ -16,6 +16,7 @@ #include "Future.h" #include "Executor.h" #include "log/Log.h" +#include "monitor/prometheus_client.h" extern "C" void future_cancel(CFuture* future) { @@ -55,6 +56,7 @@ future_destroy(CFuture* future) { extern "C" void executor_set_thread_num(int thread_num) { milvus::futures::getGlobalCPUExecutor()->setNumThreads(thread_num); + milvus::monitor::internal_cgo_pool_size_all.Set(thread_num); LOG_INFO("future executor setup cpu executor with thread num: {}", thread_num); } diff --git a/internal/core/src/monitor/prometheus_client.cpp b/internal/core/src/monitor/prometheus_client.cpp index dca0ab677e..bab294c4e3 100644 --- a/internal/core/src/monitor/prometheus_client.cpp +++ b/internal/core/src/monitor/prometheus_client.cpp @@ -10,9 +10,30 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "prometheus_client.h" +#include namespace milvus::monitor { +const prometheus::Histogram::BucketBoundaries secondsBuckets = { + std::chrono::duration(std::chrono::microseconds(10)).count(), + std::chrono::duration(std::chrono::microseconds(50)).count(), + std::chrono::duration(std::chrono::microseconds(100)).count(), + std::chrono::duration(std::chrono::microseconds(250)).count(), + std::chrono::duration(std::chrono::microseconds(500)).count(), + std::chrono::duration(std::chrono::milliseconds(1)).count(), + std::chrono::duration(std::chrono::milliseconds(5)).count(), + std::chrono::duration(std::chrono::milliseconds(10)).count(), + std::chrono::duration(std::chrono::milliseconds(20)).count(), + std::chrono::duration(std::chrono::milliseconds(50)).count(), + std::chrono::duration(std::chrono::milliseconds(100)).count(), + std::chrono::duration(std::chrono::milliseconds(200)).count(), + std::chrono::duration(std::chrono::milliseconds(500)).count(), + std::chrono::duration(std::chrono::seconds(1)).count(), + std::chrono::duration(std::chrono::seconds(2)).count(), + std::chrono::duration(std::chrono::seconds(5)).count(), + std::chrono::duration(std::chrono::seconds(10)).count(), +}; + const prometheus::Histogram::BucketBoundaries buckets = {1, 2, 4, @@ -255,4 +276,44 @@ DEFINE_PROMETHEUS_GAUGE(internal_mmap_in_used_count_anon, DEFINE_PROMETHEUS_GAUGE(internal_mmap_in_used_count_file, internal_mmap_in_used_count, mmapAllocatedCountFileLabel) + +// async cgo metrics +DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_queue_duration_seconds, + "[cpp]async cgo queue duration"); +DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS( + internal_cgo_queue_duration_seconds_all, + internal_cgo_queue_duration_seconds, + {}, + secondsBuckets); + +DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_execute_duration_seconds, + "[cpp]async execute duration"); +DEFINE_PROMETHEUS_HISTOGRAM_WITH_BUCKETS( + internal_cgo_execute_duration_seconds_all, + internal_cgo_execute_duration_seconds, + {}, + secondsBuckets); + +DEFINE_PROMETHEUS_COUNTER_FAMILY(internal_cgo_cancel_before_execute_total, + "[cpp]async cgo cancel before execute count"); +DEFINE_PROMETHEUS_COUNTER(internal_cgo_cancel_before_execute_total_all, + internal_cgo_cancel_before_execute_total, + {}); + +DEFINE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_pool_size, + "[cpp]async cgo pool size"); +DEFINE_PROMETHEUS_GAUGE(internal_cgo_pool_size_all, internal_cgo_pool_size, {}); + +DEFINE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_inflight_task_total, + "[cpp]async cgo inflight task"); +DEFINE_PROMETHEUS_GAUGE(internal_cgo_inflight_task_total_all, + internal_cgo_inflight_task_total, + {}); + +DEFINE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_executing_task_total, + "[cpp]async cgo executing task"); +DEFINE_PROMETHEUS_GAUGE(internal_cgo_executing_task_total_all, + internal_cgo_executing_task_total, + {}); + } // namespace milvus::monitor diff --git a/internal/core/src/monitor/prometheus_client.h b/internal/core/src/monitor/prometheus_client.h index 5eaefee356..e10bc2618b 100644 --- a/internal/core/src/monitor/prometheus_client.h +++ b/internal/core/src/monitor/prometheus_client.h @@ -142,4 +142,18 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_retrieve_get_target_entry_latency); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_get_target_entry_latency); +// async cgo metrics +DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_queue_duration_seconds); +DECLARE_PROMETHEUS_HISTOGRAM(internal_cgo_queue_duration_seconds_all); +DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_execute_duration_seconds); +DECLARE_PROMETHEUS_HISTOGRAM(internal_cgo_execute_duration_seconds_all); +DECLARE_PROMETHEUS_COUNTER_FAMILY(internal_cgo_cancel_before_execute_total) +DECLARE_PROMETHEUS_COUNTER(internal_cgo_cancel_before_execute_total_all); +DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_pool_size); +DECLARE_PROMETHEUS_GAUGE(internal_cgo_pool_size_all); +DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_inflight_task_total); +DECLARE_PROMETHEUS_GAUGE(internal_cgo_inflight_task_total_all); +DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cgo_executing_task_total); +DECLARE_PROMETHEUS_GAUGE(internal_cgo_executing_task_total_all); + } // namespace milvus::monitor