Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Overseer: subsystems communicate directly #2227

Merged
23 commits merged into from
Mar 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions node/metered-channel/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use super::Meter;


/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn channel<T>(capacity: usize, name: &'static str) -> (MeteredSender<T>, MeteredReceiver<T>) {
pub fn channel<T>(capacity: usize) -> (MeteredSender<T>, MeteredReceiver<T>) {
let (tx, rx) = mpsc::channel(capacity);
let mut shared_meter = Meter::default();
shared_meter.name = name;
let shared_meter = Meter::default();
let tx = MeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = MeteredReceiver { meter: shared_meter, inner: rx };
(tx, rx)
Expand Down
17 changes: 5 additions & 12 deletions node/metered-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ pub use self::unbounded::*;
/// A peek into the inner state of a meter.
#[derive(Debug, Clone, Default)]
pub struct Meter {
/// Name of the receiver and sender pair.
name: &'static str,
// Number of sends on this channel.
sent: Arc<AtomicUsize>,
// Number of receives on this channel.
Expand Down Expand Up @@ -60,11 +58,6 @@ impl Meter {
}
}

/// Obtain the name of the channel `Sender` and `Receiver` pair.
pub fn name(&self) -> &'static str {
self.name
}

fn note_sent(&self) {
self.sent.fetch_add(1, Ordering::Relaxed);
}
Expand Down Expand Up @@ -92,7 +85,7 @@ mod tests {
#[test]
fn try_send_try_next() {
block_on(async move {
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
let (mut tx, mut rx) = channel::<Msg>(5);
let msg = Msg::default();
assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 });
tx.try_send(msg).unwrap();
Expand All @@ -116,7 +109,7 @@ mod tests {
fn with_tasks() {
let (ready, go) = futures::channel::oneshot::channel();

let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
let (mut tx, mut rx) = channel::<Msg>(5);
block_on(async move {
futures::join!(
async move {
Expand Down Expand Up @@ -149,7 +142,7 @@ mod tests {

#[test]
fn stream_and_sink() {
let (mut tx, mut rx) = channel::<Msg>(5, "goofy");
let (mut tx, mut rx) = channel::<Msg>(5);

block_on(async move {
futures::join!(
Expand All @@ -175,8 +168,8 @@ mod tests {

#[test]
fn failed_send_does_not_inc_sent() {
let (mut bounded, _) = channel::<Msg>(5, "pluto");
let (mut unbounded, _) = unbounded::<Msg>("pluto");
let (mut bounded, _) = channel::<Msg>(5);
let (mut unbounded, _) = unbounded::<Msg>();

block_on(async move {
assert!(bounded.send(Msg::default()).await.is_err());
Expand Down
7 changes: 3 additions & 4 deletions node/metered-channel/src/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use super::Meter;


/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn unbounded<T>(name: &'static str) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
let (tx, rx) = mpsc::unbounded();
let mut shared_meter = Meter::default();
shared_meter.name = name;
let shared_meter = Meter::default();
let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
(tx, rx)
Expand Down Expand Up @@ -147,7 +146,7 @@ impl<T> UnboundedMeteredSender<T> {


/// Attempt to send message or fail immediately.
pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.meter.note_sent();
self.inner.unbounded_send(msg).map_err(|e| {
self.meter.retract_sent();
Expand Down
2 changes: 1 addition & 1 deletion node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ mod tests {
TestAuthorityDiscovery,
) {
let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
let (action_tx, action_rx) = metered::unbounded("test_action");
let (action_tx, action_rx) = metered::unbounded();

(
TestNetwork {
Expand Down
3 changes: 1 addition & 2 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ async-trait = "0.1.42"
client = { package = "sc-client-api", git = "https:/paritytech/substrate", branch = "master" }
futures = "0.3.12"
futures-timer = "3.0.2"
oorandom = "11.1.3"
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-primitives = { path = "../../primitives" }
Expand All @@ -20,6 +19,6 @@ tracing = "0.1.25"
sp-core = { git = "https:/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../network/protocol" }
futures = { version = "0.3.12", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.1.1"
kv-log-macro = "1.0.7"
assert_matches = "1.4.0"
Loading