Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Handling timers for repeat dispute participation requests (#6901)
Browse files Browse the repository at this point in the history
* Added participation and queue sizes metrics

* First draft of all metric code

* Tests pass

* Changed Metrics to field on participation + queues

* fmt

* Improving naming

* Refactor, placing timer in ParticipationRequest

* fmt

* Final cleanup

* Revert "Final cleanup"

This reverts commit 02e5608.

* Changing metric names

* Implementing Eq only for unit tests

* fmt

* Removing Clone trait from ParticipationRequest

* fmt

* Moved clone functionality to tests helper

* fmt

* Fixing dropped timers on repeat requests

* Keep older best effort timers

* Removing comment redundency and explaining better

* Updating queue() to use single mem read

* fmt
  • Loading branch information
BradleyOlson64 authored Mar 21, 2023
1 parent 435e50b commit ad0c8a9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 21 deletions.
70 changes: 50 additions & 20 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{cmp::Ordering, collections::BTreeMap};
use std::{
cmp::Ordering,
collections::{btree_map::Entry, BTreeMap},
};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -70,7 +73,7 @@ pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped
request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
Expand Down Expand Up @@ -119,12 +122,7 @@ impl ParticipationRequest {
session: SessionIndex,
request_timer: Option<prometheus::HistogramTimer>,
) -> Self {
Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
session,
_request_timer: request_timer,
}
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, request_timer }
}

pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
Expand All @@ -147,15 +145,11 @@ impl ParticipationRequest {
#[cfg(test)]
impl PartialEq for ParticipationRequest {
fn eq(&self, other: &Self) -> bool {
let ParticipationRequest {
candidate_receipt,
candidate_hash,
session: _session,
_request_timer,
} = self;
let ParticipationRequest { candidate_receipt, candidate_hash, session, request_timer: _ } =
self;
candidate_receipt == other.candidate_receipt() &&
candidate_hash == other.candidate_hash() &&
self.session == other.session()
*session == other.session()
}
}
#[cfg(test)]
Expand Down Expand Up @@ -227,19 +221,46 @@ impl Queues {
Ok(())
}

/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if it is considered priority now.
///
/// Returns error in case a queue was found full already.
///
/// # Request timers
///
/// [`ParticipationRequest`]s contain request timers.
/// Where an old request would be replaced by a new one, we keep the old request.
/// This prevents request timers from resetting on each new request.
fn queue_with_comparator(
&mut self,
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
mut req: ParticipationRequest,
) -> std::result::Result<(), QueueError> {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
// Remove any best effort entry, using it to replace our new
// request.
if let Some(older_request) = self.best_effort.remove(&comparator) {
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
}
req = older_request;
}
// Keeping old request if any.
match self.priority.entry(comparator) {
Entry::Occupied(_) =>
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
},
Entry::Vacant(vac) => {
vac.insert(req);
},
}
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
Expand All @@ -251,7 +272,16 @@ impl Queues {
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
// Keeping old request if any.
match self.best_effort.entry(comparator) {
Entry::Occupied(_) =>
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
},
Entry::Vacant(vac) => {
vac.insert(req);
},
}
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn clone_request(request: &ParticipationRequest) -> ParticipationRequest {
candidate_receipt: request.candidate_receipt.clone(),
candidate_hash: request.candidate_hash.clone(),
session: request.session,
_request_timer: None,
request_timer: None,
}
}

Expand Down

0 comments on commit ad0c8a9

Please sign in to comment.