Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Adding Dispute Participation Metrics #6838

Merged
merged 15 commits into from
Mar 11, 2023
Merged
9 changes: 7 additions & 2 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Initialized {
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;

let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender);
let participation = Participation::new(participation_sender, metrics.clone());
let highest_session = rolling_session_window.latest_session();

Self {
Expand Down Expand Up @@ -916,12 +916,17 @@ impl Initialized {
} else {
self.metrics.on_queued_best_effort_participation();
}
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
let r = self
.participation
.queue_participation(
ctx,
priority,
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
ParticipationRequest::new(
new_state.candidate_receipt().clone(),
session,
request_timer,
),
)
.await;
log_error(r)?;
Expand Down
2 changes: 2 additions & 0 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem {
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
gilescope marked this conversation as resolved.
Show resolved Hide resolved
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
request_timer,
),
));
}
Expand Down
68 changes: 68 additions & 0 deletions node/core/dispute-coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ struct MetricsInner {
vote_cleanup_time: prometheus::Histogram,
/// Number of refrained participations.
refrained_participations: prometheus::Counter<prometheus::U64>,
/// Distribution of participation durations.
participation_durations: prometheus::Histogram,
/// Measures the duration of the full participation pipeline: From when
/// a participation request is first queued to when participation in the
/// requested dispute is complete.
participation_pipeline_durations: prometheus::Histogram,
/// Size of participation priority queue
participation_priority_queue_size: prometheus::Gauge<prometheus::U64>,
/// Size of participation best effort queue
participation_best_effort_queue_size: prometheus::Gauge<prometheus::U64>,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -96,6 +106,36 @@ impl Metrics {
metrics.refrained_participations.inc();
}
}

/// Provide a timer for participation durations which updates on drop.
pub(crate) fn time_participation(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer())
}

/// Provide a timer for participation pipeline durations which updates on drop.
pub(crate) fn time_participation_pipeline(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.participation_pipeline_durations.start_timer())
}

/// Set the priority_queue_size metric
pub fn report_priority_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_priority_queue_size.set(size);
}
}

/// Set the best_effort_queue_size metric
pub fn report_best_effort_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_best_effort_queue_size.set(size);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -163,6 +203,34 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
participation_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_durations",
"Time spent within fn Participation::participate",
sandreim marked this conversation as resolved.
Show resolved Hide resolved
)
)?,
registry,
)?,
participation_pipeline_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_pipeline_durations",
"Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.",
)
)?,
registry,
)?,
participation_priority_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should the metric names be updated to match the new variable names? e.g. "polkadot_parachain_dispute_participation_priority_queue_size"

"Number of disputes waiting for local participation in the priority queue.")?,
registry,
)?,
participation_best_effort_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size",
"Number of disputes waiting for local participation in the best effort queue.")?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
26 changes: 21 additions & 5 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ mod queues;
use queues::Queues;
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

/// How many participation processes do we want to run in parallel the most.
///
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
Expand All @@ -69,6 +72,8 @@ pub struct Participation {
worker_sender: WorkerMessageSender,
/// Some recent block for retrieving validation code from chain.
recent_block: Option<(BlockNumber, Hash)>,
/// Metrics handle cloned from Initialized
metrics: Metrics,
}

/// Message from worker tasks.
Expand Down Expand Up @@ -133,12 +138,13 @@ impl Participation {
/// The passed in sender will be used by background workers to communicate back their results.
/// The calling context should make sure to call `Participation::on_worker_message()` for the
/// received messages.
pub fn new(sender: WorkerMessageSender) -> Self {
pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self {
Self {
running_participations: HashSet::new(),
queue: Queues::new(),
queue: Queues::new(metrics.clone()),
eskimor marked this conversation as resolved.
Show resolved Hide resolved
worker_sender: sender,
recent_block: None,
metrics,
}
}

Expand Down Expand Up @@ -235,7 +241,8 @@ impl Participation {
recent_head: Hash,
) -> FatalResult<()> {
while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
if let Some(req) = self.queue.dequeue() {
let maybe_req = self.queue.dequeue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why we need this variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! That was an artifact from a discarded implementation.

if let Some(req) = maybe_req {
self.fork_participation(ctx, req, recent_head)?;
} else {
break
Expand All @@ -251,11 +258,19 @@ impl Participation {
req: ParticipationRequest,
recent_head: Hash,
) -> FatalResult<()> {
let participation_timer = self.metrics.time_participation();
if self.running_participations.insert(*req.candidate_hash()) {
let sender = ctx.sender().clone();
ctx.spawn(
"participation-worker",
participate(self.worker_sender.clone(), sender, recent_head, req).boxed(),
participate(
self.worker_sender.clone(),
sender,
recent_head,
req,
participation_timer,
)
.boxed(),
)
.map_err(FatalError::SpawnFailed)?;
}
Expand All @@ -267,7 +282,8 @@ async fn participate(
mut result_sender: WorkerMessageSender,
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
req: ParticipationRequest, // Sends metric data via request_timer field when dropped
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
) {
#[cfg(test)]
// Hack for tests, so we get recovery messages not too early.
Expand Down
55 changes: 48 additions & 7 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand All @@ -25,6 +25,9 @@ use crate::{
LOG_TARGET,
};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -56,14 +59,18 @@ pub struct Queues {

/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Handle for recording queues data in metrics
metrics: Metrics,
}

/// A dispute participation request that can be queued.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Arc<Option<prometheus::HistogramTimer>>, // Sends metric data when request is dropped
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
Expand Down Expand Up @@ -107,8 +114,17 @@ pub enum QueueError {

impl ParticipationRequest {
/// Create a new `ParticipationRequest` to be queued.
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
pub fn new(
candidate_receipt: CandidateReceipt,
session: SessionIndex,
request_timer: Arc<Option<prometheus::HistogramTimer>>,
) -> Self {
Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
session,
_request_timer: request_timer,
}
}

pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
Expand All @@ -124,12 +140,26 @@ impl ParticipationRequest {
let Self { candidate_hash, candidate_receipt, .. } = self;
(candidate_hash, candidate_receipt)
}
// For tests we want to check whether requests are equal, but the
// request_timer field of ParticipationRequest doesn't implement
// eq. This helper checks whether all other fields are equal,
// which is sufficient.
#[cfg(test)]
pub fn functionally_equal(&self, other: ParticipationRequest) -> bool {
Copy link
Contributor

@mrcnski mrcnski Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I had an internal struggle about this, but I think we should just impl eq on this type. I believe that already implies functional equality. I read the docs for PartialEq, and there is no restriction mentioned about having to include every field in the equality comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! I hadn't thought of it that way. But it does make sense. I shall make changes.

if &self.candidate_receipt == other.candidate_receipt() &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd implement this by destructuring (unless it's not possible due to something I'm not seeing right now). This way we'll get a compilation error if a new field is added to the struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can't we implement std::cmp::PartialEq::eq for ParticipationRequest so that we have got a bit nicer looking syntax?

&self.candidate_hash == other.candidate_hash() &&
self.session == other.session()
{
return true
}
false
}
}

impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
pub fn new(metrics: Metrics) -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), metrics }
}

/// Will put message in queue, either priority or best effort depending on priority.
Expand All @@ -154,9 +184,14 @@ impl Queues {
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
self.metrics.report_priority_queue_size(self.priority.len() as u64);
return Some(req.1)
}
if let Some(req) = self.pop_best_effort() {
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
return Some(req.1)
}
self.pop_best_effort().map(|d| d.1)
None
}

/// Reprioritizes any participation requests pertaining to the
Expand All @@ -180,6 +215,9 @@ impl Queues {
}
if let Some(request) = self.best_effort.remove(&comparator) {
self.priority.insert(comparator, request);
// Report changes to both queue sizes
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand All @@ -197,6 +235,8 @@ impl Queues {
// Remove any best effort entry:
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
Expand All @@ -207,6 +247,7 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand Down
Loading