diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 8fd3e9373d01..620c58fbb7e6 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -96,7 +96,7 @@ impl Initialized { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; let (participation_sender, participation_receiver) = mpsc::channel(1); - let participation = Participation::new(participation_sender); + let participation = Participation::new(participation_sender, metrics.clone()); let highest_session = rolling_session_window.latest_session(); Self { @@ -916,12 +916,17 @@ impl Initialized { } else { self.metrics.on_queued_best_effort_participation(); } + let request_timer = Arc::new(self.metrics.time_participation_pipeline()); let r = self .participation .queue_participation( ctx, priority, - ParticipationRequest::new(new_state.candidate_receipt().clone(), session), + ParticipationRequest::new( + new_state.candidate_receipt().clone(), + session, + request_timer, + ), ) .await; log_error(r)?; diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 1c66c6c6099c..de155c1e8f42 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem { ?candidate_hash, "Found valid dispute, with no vote from us on startup - participating." ); + let request_timer = Arc::new(self.metrics.time_participation_pipeline()); participation_requests.push(( ParticipationPriority::with_priority_if(is_included), ParticipationRequest::new( vote_state.votes().candidate_receipt.clone(), session, + request_timer, ), )); } diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 70cd49ac49d1..977f5cc700f6 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -32,6 +32,16 @@ struct MetricsInner { vote_cleanup_time: prometheus::Histogram, /// Number of refrained participations. refrained_participations: prometheus::Counter, + /// Distribution of participation durations. + participation_durations: prometheus::Histogram, + /// Measures the duration of the full participation pipeline: From when + /// a participation request is first queued to when participation in the + /// requested dispute is complete. + participation_pipeline_durations: prometheus::Histogram, + /// Size of participation priority queue + participation_priority_queue_size: prometheus::Gauge, + /// Size of participation best effort queue + participation_best_effort_queue_size: prometheus::Gauge, } /// Candidate validation metrics. @@ -96,6 +106,36 @@ impl Metrics { metrics.refrained_participations.inc(); } } + + /// Provide a timer for participation durations which updates on drop. + pub(crate) fn time_participation( + &self, + ) -> Option { + self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer()) + } + + /// Provide a timer for participation pipeline durations which updates on drop. + pub(crate) fn time_participation_pipeline( + &self, + ) -> Option { + self.0 + .as_ref() + .map(|metrics| metrics.participation_pipeline_durations.start_timer()) + } + + /// Set the priority_queue_size metric + pub fn report_priority_queue_size(&self, size: u64) { + if let Some(metrics) = &self.0 { + metrics.participation_priority_queue_size.set(size); + } + } + + /// Set the best_effort_queue_size metric + pub fn report_best_effort_queue_size(&self, size: u64) { + if let Some(metrics) = &self.0 { + metrics.participation_best_effort_queue_size.set(size); + } + } } impl metrics::Metrics for Metrics { @@ -163,6 +203,34 @@ impl metrics::Metrics for Metrics { ))?, registry, )?, + participation_durations: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_dispute_participation_durations", + "Time spent within fn Participation::participate", + ) + )?, + registry, + )?, + participation_pipeline_durations: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_dispute_participation_pipeline_durations", + "Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.", + ) + )?, + registry, + )?, + participation_priority_queue_size: prometheus::register( + prometheus::Gauge::new("polkadot_parachain_dispute_participation_priority_queue_size", + "Number of disputes waiting for local participation in the priority queue.")?, + registry, + )?, + participation_best_effort_queue_size: prometheus::register( + prometheus::Gauge::new("polkadot_parachain_dispute_participation_best_effort_queue_size", + "Number of disputes waiting for local participation in the best effort queue.")?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 51ad52f1bace..e366adc5facb 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -48,6 +48,9 @@ mod queues; use queues::Queues; pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; +use crate::metrics::Metrics; +use polkadot_node_subsystem_util::metrics::prometheus::prometheus; + /// How many participation processes do we want to run in parallel the most. /// /// This should be a relatively low value, while we might have a speedup once we fetched the data, @@ -71,6 +74,8 @@ pub struct Participation { worker_sender: WorkerMessageSender, /// Some recent block for retrieving validation code from chain. recent_block: Option<(BlockNumber, Hash)>, + /// Metrics handle cloned from Initialized + metrics: Metrics, } /// Message from worker tasks. @@ -135,12 +140,13 @@ impl Participation { /// The passed in sender will be used by background workers to communicate back their results. /// The calling context should make sure to call `Participation::on_worker_message()` for the /// received messages. - pub fn new(sender: WorkerMessageSender) -> Self { + pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self { Self { running_participations: HashSet::new(), - queue: Queues::new(), + queue: Queues::new(metrics.clone()), worker_sender: sender, recent_block: None, + metrics, } } @@ -253,11 +259,19 @@ impl Participation { req: ParticipationRequest, recent_head: Hash, ) -> FatalResult<()> { + let participation_timer = self.metrics.time_participation(); if self.running_participations.insert(*req.candidate_hash()) { let sender = ctx.sender().clone(); ctx.spawn( "participation-worker", - participate(self.worker_sender.clone(), sender, recent_head, req).boxed(), + participate( + self.worker_sender.clone(), + sender, + recent_head, + req, + participation_timer, + ) + .boxed(), ) .map_err(FatalError::SpawnFailed)?; } @@ -269,7 +283,8 @@ async fn participate( mut result_sender: WorkerMessageSender, mut sender: impl overseer::DisputeCoordinatorSenderTrait, block_hash: Hash, - req: ParticipationRequest, + req: ParticipationRequest, // Sends metric data via request_timer field when dropped + _participation_timer: Option, // Sends metric data when dropped ) { #[cfg(test)] // Hack for tests, so we get recovery messages not too early. diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index b632e04dbb4f..cbfb71e20b42 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{cmp::Ordering, collections::BTreeMap}; +use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; @@ -25,6 +25,9 @@ use crate::{ LOG_TARGET, }; +use crate::metrics::Metrics; +use polkadot_node_subsystem_util::metrics::prometheus::prometheus; + #[cfg(test)] mod tests; @@ -56,14 +59,18 @@ pub struct Queues { /// Priority queue. priority: BTreeMap, + + /// Handle for recording queues data in metrics + metrics: Metrics, } /// A dispute participation request that can be queued. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone)] pub struct ParticipationRequest { candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, + _request_timer: Arc>, // Sends metric data when request is dropped } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. @@ -107,8 +114,17 @@ pub enum QueueError { impl ParticipationRequest { /// Create a new `ParticipationRequest` to be queued. - pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self { - Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session } + pub fn new( + candidate_receipt: CandidateReceipt, + session: SessionIndex, + request_timer: Arc>, + ) -> Self { + Self { + candidate_hash: candidate_receipt.hash(), + candidate_receipt, + session, + _request_timer: request_timer, + } } pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt { @@ -126,10 +142,29 @@ impl ParticipationRequest { } } +// We want to compare participation requests in unit tests, so we +// only implement Eq for tests. +#[cfg(test)] +impl PartialEq for ParticipationRequest { + fn eq(&self, other: &Self) -> bool { + let ParticipationRequest { + candidate_receipt, + candidate_hash, + session: _session, + _request_timer, + } = self; + candidate_receipt == other.candidate_receipt() && + candidate_hash == other.candidate_hash() && + self.session == other.session() + } +} +#[cfg(test)] +impl Eq for ParticipationRequest {} + impl Queues { /// Create new `Queues`. - pub fn new() -> Self { - Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() } + pub fn new(metrics: Metrics) -> Self { + Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), metrics } } /// Will put message in queue, either priority or best effort depending on priority. @@ -154,9 +189,14 @@ impl Queues { /// First the priority queue is considered and then the best effort one. pub fn dequeue(&mut self) -> Option { if let Some(req) = self.pop_priority() { + self.metrics.report_priority_queue_size(self.priority.len() as u64); + return Some(req.1) + } + if let Some(req) = self.pop_best_effort() { + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); return Some(req.1) } - self.pop_best_effort().map(|d| d.1) + None } /// Reprioritizes any participation requests pertaining to the @@ -180,6 +220,9 @@ impl Queues { } if let Some(request) = self.best_effort.remove(&comparator) { self.priority.insert(comparator, request); + // Report changes to both queue sizes + self.metrics.report_priority_queue_size(self.priority.len() as u64); + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) } @@ -197,6 +240,8 @@ impl Queues { // Remove any best effort entry: self.best_effort.remove(&comparator); self.priority.insert(comparator, req); + self.metrics.report_priority_queue_size(self.priority.len() as u64); + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } else { if self.priority.contains_key(&comparator) { // The candidate is already in priority queue - don't @@ -207,6 +252,7 @@ impl Queues { return Err(QueueError::BestEffortFull) } self.best_effort.insert(comparator, req); + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) } diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index 164e7b3f011b..63df0d0a11ef 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -14,10 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::ParticipationPriority; +use crate::{metrics::Metrics, ParticipationPriority}; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; use polkadot_primitives::{BlockNumber, Hash}; +use std::sync::Arc; use super::{CandidateComparator, ParticipationRequest, QueueError, Queues}; @@ -26,7 +27,8 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest { let mut receipt = dummy_candidate_receipt(dummy_hash()); // make it differ: receipt.commitments_hash = hash; - ParticipationRequest::new(receipt, 1) + let request_timer = Arc::new(Metrics::default().time_participation_pipeline()); + ParticipationRequest::new(receipt, 1, request_timer) } /// Make dummy comparator for request, based on the given block number. @@ -44,7 +46,8 @@ fn make_dummy_comparator( /// block number should be treated with lowest priority. #[test] fn ordering_works_as_expected() { - let mut queue = Queues::new(); + let metrics = Metrics::default(); + let mut queue = Queues::new(metrics.clone()); let req1 = make_participation_request(Hash::repeat_byte(0x01)); let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req3 = make_participation_request(Hash::repeat_byte(0x03)); @@ -91,7 +94,7 @@ fn ordering_works_as_expected() { queue.queue_with_comparator( make_dummy_comparator(&req_prio_full, Some(3)), ParticipationPriority::Priority, - req_prio_full + req_prio_full, ), Err(QueueError::PriorityFull) ); @@ -99,7 +102,7 @@ fn ordering_works_as_expected() { queue.queue_with_comparator( make_dummy_comparator(&req_full, Some(3)), ParticipationPriority::BestEffort, - req_full + req_full, ), Err(QueueError::BestEffortFull) ); @@ -118,7 +121,8 @@ fn ordering_works_as_expected() { /// No matter how often a candidate gets queued, it should only ever get dequeued once. #[test] fn candidate_is_only_dequeued_once() { - let mut queue = Queues::new(); + let metrics = Metrics::default(); + let mut queue = Queues::new(metrics.clone()); let req1 = make_participation_request(Hash::repeat_byte(0x01)); let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); @@ -154,7 +158,6 @@ fn candidate_is_only_dequeued_once() { req_prio.clone(), ) .unwrap(); - // Insert first as best effort: queue .queue_with_comparator( @@ -195,5 +198,5 @@ fn candidate_is_only_dequeued_once() { assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); assert_eq!(queue.dequeue(), Some(req1)); - assert_eq!(queue.dequeue(), None); + assert_matches!(queue.dequeue(), None); } diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 134324f69e26..a6e5f86616d7 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -72,7 +72,8 @@ async fn participate_with_commitments_hash( }; let session = 1; - let req = ParticipationRequest::new(candidate_receipt, session); + let request_timer = Arc::new(participation.metrics.time_participation_pipeline()); + let req = ParticipationRequest::new(candidate_receipt, session, request_timer); participation .queue_participation(ctx, ParticipationPriority::BestEffort, req) @@ -189,7 +190,7 @@ fn same_req_wont_get_queued_if_participation_is_already_running() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); for _ in 0..MAX_PARALLEL_PARTICIPATIONS { @@ -228,7 +229,7 @@ fn reqs_get_queued_when_out_of_capacity() { let test = async { let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); for i in 0..MAX_PARALLEL_PARTICIPATIONS { @@ -292,7 +293,7 @@ fn reqs_get_queued_on_no_recent_block() { let (mut unblock_test, mut wait_for_verification) = mpsc::channel(0); let test = async { let (sender, _worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); participate(&mut ctx, &mut participation).await.unwrap(); // We have initiated participation but we'll block `active_leaf` so that we can check that @@ -342,7 +343,7 @@ fn cannot_participate_if_cannot_recover_available_data() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -372,7 +373,7 @@ fn cannot_participate_if_cannot_recover_validation_code() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -409,7 +410,7 @@ fn cast_invalid_vote_if_available_data_is_invalid() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -440,7 +441,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -477,7 +478,7 @@ fn cast_invalid_vote_if_commitments_dont_match() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -514,7 +515,7 @@ fn cast_valid_vote_if_validation_passes() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap();