Skip to content

Commit

Permalink
More metrics on PP loop
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 9, 2024
1 parent fa6d081 commit a718fcd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
35 changes: 34 additions & 1 deletion crates/worker/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,23 @@

/// Optional to have but adds description/help message to the metrics emitted to
/// the metrics' sink.
use metrics::{describe_counter, Unit};
use metrics::{describe_counter, describe_histogram, Unit};

pub const PARTITION_APPLY_COMMAND: &str = "restate.partition.apply_command.total";
pub const PARTITION_ACTUATOR_HANDLED: &str = "restate.partition.actuator_handled.total";
pub const PARTITION_TIMER_DUE_HANDLED: &str = "restate.partition.timer_due_handled.total";
pub const PARTITION_STORAGE_TX_CREATED: &str = "restate.partition.storage_tx_created.total";
pub const PARTITION_STORAGE_TX_COMMITTED: &str = "restate.partition.storage_tx_committed.total";

pub const PP_LOG_READ_NEXT_DURATION: &str = "restate.partition.log_read_next_duration.seconds";

pub const PP_APPLY_RECORD_DURATION: &str = "restate.partition.apply_record_duration.seconds";
pub const PP_WAIT_OR_IDLE_DURATION: &str = "restate.partition.wait_or_idle_duration.seconds";
pub const PP_APPLY_EFFECTS_DURATION: &str = "restate.partition.apply_effects_duration.seconds";
pub const PP_APPLY_TIMERS_DURATION: &str = "restate.partition.apply_timers_duration.seconds";

pub const PARTITION_LABEL: &str = "partition";

pub(crate) fn describe_metrics() {
describe_counter!(
PARTITION_APPLY_COMMAND,
Expand All @@ -44,4 +53,28 @@ pub(crate) fn describe_metrics() {
Unit::Count,
"Storage transactions committed by applying partition state machine commands"
);
describe_histogram!(
PP_LOG_READ_NEXT_DURATION,
Unit::Seconds,
"Time spent attempting to read the next record off bifrost, this is inclusive of wait time if not records are available to read");
describe_histogram!(
PP_APPLY_RECORD_DURATION,
Unit::Seconds,
"Time spent processing a single bifrost message"
);
describe_histogram!(
PP_WAIT_OR_IDLE_DURATION,
Unit::Seconds,
"Time spent since last activity on this partition processor"
);
describe_histogram!(
PP_APPLY_EFFECTS_DURATION,
Unit::Seconds,
"Time spent applying effects in a single iteration"
);
describe_histogram!(
PP_APPLY_TIMERS_DURATION,
Unit::Seconds,
"Time spent applying effects in a single iteration"
);
}
31 changes: 27 additions & 4 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metric_definitions::{PARTITION_ACTUATOR_HANDLED, PARTITION_TIMER_DUE_HANDLED};
use crate::metric_definitions::{
PARTITION_ACTUATOR_HANDLED, PARTITION_LABEL, PARTITION_TIMER_DUE_HANDLED,
PP_APPLY_EFFECTS_DURATION, PP_APPLY_RECORD_DURATION, PP_APPLY_TIMERS_DURATION,
PP_LOG_READ_NEXT_DURATION, PP_WAIT_OR_IDLE_DURATION,
};
use crate::partition::leadership::{ActionEffect, LeadershipState};
use crate::partition::state_machine::{ActionCollector, Effects, StateMachine};
use crate::partition::storage::{DedupSequenceNumberResolver, PartitionStorage, Transaction};
use assert2::let_assert;
use futures::StreamExt;
use metrics::counter;
use metrics::{counter, histogram};
use restate_core::metadata;
use restate_network::Networking;
use restate_partition_store::{PartitionStore, RocksDBTransaction};
use restate_types::identifiers::{PartitionId, PartitionKey};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::time::Instant;
use tracing::{debug, instrument, trace, Span};

mod action_effect_handler;
Expand Down Expand Up @@ -132,10 +137,15 @@ where
networking,
);

let mut cancellation = std::pin::pin!(cancellation_watcher());
let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string()));
loop {
let iteration_start = Instant::now();
tokio::select! {
_ = cancellation_watcher() => break,
_ = &mut cancellation => break,
record = log_reader.read_next() => {
let command_start = Instant::now();
histogram!(PP_WAIT_OR_IDLE_DURATION, PARTITION_LABEL => partition_id_str).record(iteration_start.elapsed());
let record = record?;
trace!(lsn = %record.0, "Processing bifrost record for '{}': {:?}", record.1.command.name(), record.1.header);

Expand Down Expand Up @@ -187,15 +197,22 @@ where
transaction.commit().await?;
state.handle_actions(action_collector.drain(..)).await?;
}
histogram!(PP_APPLY_RECORD_DURATION, PARTITION_LABEL => partition_id_str).record(command_start.elapsed());
},
action_effect = action_effect_stream.next() => {
histogram!(PP_WAIT_OR_IDLE_DURATION, PARTITION_LABEL => partition_id_str).record(iteration_start.elapsed());
let effect_start = Instant::now();
counter!(PARTITION_ACTUATOR_HANDLED).increment(1);
let action_effect = action_effect.ok_or_else(|| anyhow::anyhow!("action effect stream is closed"))?;
state.handle_action_effect(action_effect).await?;
histogram!(PP_APPLY_EFFECTS_DURATION).record(effect_start.elapsed());
},
timer = state.run_timer() => {
histogram!(PP_WAIT_OR_IDLE_DURATION, PARTITION_LABEL => partition_id_str).record(iteration_start.elapsed());
let timer_start = Instant::now();
counter!(PARTITION_TIMER_DUE_HANDLED).increment(1);
state.handle_action_effect(ActionEffect::Timer(timer)).await?;
histogram!(PP_APPLY_TIMERS_DURATION, PARTITION_LABEL => partition_id_str).record(timer_start.elapsed());
},
}
}
Expand Down Expand Up @@ -342,18 +359,24 @@ async fn is_outdated_or_duplicate(

struct LogReader {
log_reader: LogReadStream,
log_id: LogId,
}

impl LogReader {
fn new(bifrost: &Bifrost, log_id: LogId, lsn: Lsn) -> Self {
Self {
log_reader: bifrost.create_reader(log_id, lsn),
log_id,
}
}

async fn read_next(&mut self) -> anyhow::Result<(Lsn, Envelope)> {
let start = Instant::now();
let LogRecord { record, offset } = self.log_reader.read_next().await?;
Self::deserialize_record(record).map(|envelope| (offset, envelope))
let res = Self::deserialize_record(record).map(|envelope| (offset, envelope));
histogram!(PP_LOG_READ_NEXT_DURATION, "log_id" => self.log_id.to_string())
.record(start.elapsed());
res
}

#[allow(dead_code)]
Expand Down

0 comments on commit a718fcd

Please sign in to comment.