Skip to content

Commit

Permalink
Export tokio runtime metrics via prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 17, 2024
1 parent ff2ff70 commit 1fbd7db
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 1 deletion.
5 changes: 5 additions & 0 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::{Duration, Instant};
use futures::{Future, FutureExt};
use metrics::counter;
use restate_types::config::CommonOptions;
use tokio::runtime::RuntimeMetrics;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
Expand Down Expand Up @@ -142,6 +143,10 @@ pub struct TaskCenter {
static_assertions::assert_impl_all!(TaskCenter: Send, Sync, Clone);

impl TaskCenter {
pub fn default_runtime_metrics(&self) -> RuntimeMetrics {
self.inner.default_runtime_handle.metrics()
}

/// Use to monitor an on-going shutdown when requested
pub fn watch_shutdown(&self) -> WaitForCancellationFutureOwned {
self.inner.global_cancel_token.clone().cancelled_owned()
Expand Down
5 changes: 5 additions & 0 deletions crates/node/src/network_server/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::network_server::prometheus_helpers::{
};
use crate::network_server::state::NodeCtrlHandlerState;

use super::prometheus_helpers::submit_tokio_metrics;

const ROCKSDB_TICKERS: &[Ticker] = &[
Ticker::BlockCacheBytesRead,
Ticker::BlockCacheBytesWrite,
Expand Down Expand Up @@ -176,6 +178,9 @@ pub async fn render_metrics(State(state): State<NodeCtrlHandlerState>) -> String
let default_cf = CfName::new("default");
let mut out = String::new();

// Default tokio runtime metrics
submit_tokio_metrics("default", state.task_center.default_runtime_metrics());

// Response content type is plain/text and that's expected.
if let Some(prometheus_handle) = state.prometheus_handle {
// Internal system metrics
Expand Down
34 changes: 34 additions & 0 deletions crates/node/src/network_server/prometheus_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

use std::fmt::Write;

use metrics::gauge;
use metrics_exporter_prometheus::formatting;
use restate_rocksdb::RocksDb;
use rocksdb::statistics::{HistogramData, Ticker};
use tokio::runtime::RuntimeMetrics;

static PREFIX: &str = "restate";

Expand Down Expand Up @@ -167,3 +169,35 @@ pub fn format_rocksdb_histogram_for_prometheus(
);
let _ = writeln!(out);
}

pub fn submit_tokio_metrics(runtime: &'static str, stats: RuntimeMetrics) {
gauge!("restate.tokio.num_workers", "runtime" => runtime).set(stats.num_workers() as f64);
gauge!("restate.tokio.blocking_threads", "runtime" => runtime)
.set(stats.num_blocking_threads() as f64);
gauge!("restate.tokio.blocking_queue_depth", "runtime" => runtime)
.set(stats.blocking_queue_depth() as f64);
gauge!("restate.tokio.active_tasks_count", "runtime" => runtime)
.set(stats.active_tasks_count() as f64);
gauge!("restate.tokio.io_driver_ready_count", "runtime" => runtime)
.set(stats.io_driver_ready_count() as f64);
gauge!("restate.tokio.remote_schedule_count", "runtime" => runtime)
.set(stats.remote_schedule_count() as f64);
// per worker stats
for idx in 0..stats.num_workers() {
gauge!("restate.tokio.worker_overflow_count", "runtime" => runtime, "worker" =>
idx.to_string())
.set(stats.worker_overflow_count(idx) as f64);
gauge!("restate.tokio.worker_poll_count", "runtime" => runtime, "worker" => idx.to_string())
.set(stats.worker_poll_count(idx) as f64);
gauge!("restate.tokio.worker_park_count", "runtime" => runtime, "worker" => idx.to_string())
.set(stats.worker_park_count(idx) as f64);
gauge!("restate.tokio.worker_noop_count", "runtime" => runtime, "worker" => idx.to_string())
.set(stats.worker_noop_count(idx) as f64);
gauge!("restate.tokio.worker_steal_count", "runtime" => runtime, "worker" => idx.to_string())
.set(stats.worker_steal_count(idx) as f64);
gauge!("restate.tokio.worker_total_busy_duration_seconds", "runtime" => runtime, "worker" => idx.to_string())
.set(stats.worker_total_busy_duration(idx).as_secs_f64());
gauge!("restate.tokio.worker_mean_poll_time", "runtime" => runtime, "worker" => idx.to_string())
.set(stats.worker_mean_poll_time(idx).as_secs_f64());
}
}
4 changes: 3 additions & 1 deletion crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ impl NetworkServer {
}

pub async fn run(self, options: CommonOptions) -> Result<(), anyhow::Error> {
let tc = task_center();
// Configure Metric Exporter
let mut state_builder = NodeCtrlHandlerStateBuilder::default();
state_builder.task_center(tc.clone());

if !options.disable_prometheus {
state_builder.prometheus_handle(Some(install_global_prometheus_recorder(&options)));
Expand Down Expand Up @@ -89,7 +91,7 @@ impl NetworkServer {
let server_builder = tonic::transport::Server::builder()
.layer(TraceLayer::new_for_grpc().make_span_with(span_factory))
.add_service(NodeSvcServer::new(NodeSvcHandler::new(
task_center(),
tc,
self.worker_deps,
self.connection_manager,
)))
Expand Down
2 changes: 2 additions & 0 deletions crates/node/src/network_server/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
// by the Apache License, Version 2.0.

use metrics_exporter_prometheus::PrometheusHandle;
use restate_core::TaskCenter;

#[derive(Clone, derive_builder::Builder)]
pub struct NodeCtrlHandlerState {
#[builder(default)]
pub prometheus_handle: Option<PrometheusHandle>,
pub task_center: TaskCenter,
}

0 comments on commit 1fbd7db

Please sign in to comment.