diff --git a/Cargo.lock b/Cargo.lock index eb9ef2b19ab4..90a541c8805b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,17 @@ dependencies = [ "webpki-roots 0.19.0", ] +[[package]] +name = "async-trait" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92" +dependencies = [ + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.31", +] + [[package]] name = "atty" version = "0.2.14" @@ -4250,6 +4261,25 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "polkadot-network-bridge" +version = "0.1.0" +dependencies = [ + "assert_matches", + "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.8", + "parity-scale-codec", + "parking_lot 0.10.2", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-primitives", + "polkadot-subsystem-test-helpers", + "sc-network", + "sp-runtime", + "streamunordered", +] + [[package]] name = "polkadot-network-test" version = "0.8.13" @@ -4272,36 +4302,39 @@ dependencies = [ ] [[package]] -name = "polkadot-node-messages" +name = "polkadot-node-primitives" version = "0.1.0" dependencies = [ - "futures 0.3.5", - "polkadot-node-primitives", + "async-trait", + "parity-scale-codec", "polkadot-primitives", "polkadot-statement-table", - "sc-network", + "sp-runtime", ] [[package]] -name = "polkadot-node-primitives" +name = "polkadot-node-subsystem" version = "0.1.0" dependencies = [ - "parity-scale-codec", + "async-trait", + "futures 0.3.5", + "polkadot-node-primitives", "polkadot-primitives", "polkadot-statement-table", - "sp-runtime", + "sc-network", ] [[package]] name = "polkadot-overseer" version = "0.1.0" dependencies = [ + "async-trait", "femme", "futures 0.3.5", "futures-timer 3.0.2", "kv-log-macro", "log 0.4.8", - "polkadot-node-messages", + "polkadot-node-subsystem", "polkadot-primitives", "sc-client-api", "streamunordered", @@ -4613,6 +4646,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.9.0", "polkadot-network", + "polkadot-node-subsystem", "polkadot-overseer", "polkadot-primitives", "polkadot-rpc", @@ -4662,6 +4696,16 @@ dependencies = [ "sp-core", ] +[[package]] +name = "polkadot-subsystem-test-helpers" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures 0.3.5", + "parking_lot 0.10.2", + "polkadot-node-subsystem", +] + [[package]] name = "polkadot-test-runtime" version = "0.8.13" diff --git a/Cargo.toml b/Cargo.toml index 34f9f14e4e05..4a72ae8dc6f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,10 +42,12 @@ members = [ "service", "validation", - "node/messages", + "node/network/bridge", "node/overseer", "node/primitives", "node/service", + "node/subsystem", + "node/test-helpers/subsystem", "parachain/test-parachains", "parachain/test-parachains/adder", diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml new file mode 100644 index 000000000000..4f6c8631e2f9 --- /dev/null +++ b/node/network/bridge/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "polkadot-network-bridge" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.5" +log = "0.4.8" +futures-timer = "3.0.2" +streamunordered = "0.5.1" +polkadot-primitives = { path = "../../../primitives" } +node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } +parity-scale-codec = "1.3.0" +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } + +[dev-dependencies] +parking_lot = "0.10.0" +subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } +assert_matches = "1.3.0" diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs new file mode 100644 index 000000000000..aef1632a9439 --- /dev/null +++ b/node/network/bridge/src/lib.rs @@ -0,0 +1,912 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Network Bridge Subsystem - protocol multiplexer for Polkadot. + +use parity_scale_codec::{Encode, Decode}; +use futures::prelude::*; +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +use sc_network::{ + ObservedRole, ReputationChange, PeerId, + Event as NetworkEvent, +}; +use sp_runtime::ConsensusEngineId; + +use polkadot_subsystem::{ + FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, + SubsystemResult, +}; +use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages}; +use node_primitives::{ProtocolId, View}; +use polkadot_primitives::{Block, Hash}; + +use std::collections::btree_map::{BTreeMap, Entry as BEntry}; +use std::collections::hash_map::{HashMap, Entry as HEntry}; +use std::pin::Pin; +use std::sync::Arc; + +/// The maximum amount of heads a peer is allowed to have in their view at any time. +/// +/// We use the same limit to compute the view sent to peers locally. +const MAX_VIEW_HEADS: usize = 5; + +/// The engine ID of the polkadot network protocol. +pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2"; +/// The protocol name. +pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2"; + +const MALFORMED_MESSAGE_COST: ReputationChange + = ReputationChange::new(-500, "Malformed Network-bridge message"); +const UNKNOWN_PROTO_COST: ReputationChange + = ReputationChange::new(-50, "Message sent to unknown protocol"); +const MALFORMED_VIEW_COST: ReputationChange + = ReputationChange::new(-500, "Malformed view"); + +/// Messages received on the network. +#[derive(Debug, Encode, Decode, Clone)] +pub enum WireMessage { + /// A message from a peer on a specific protocol. + #[codec(index = "1")] + ProtocolMessage(ProtocolId, Vec), + /// A view update from a peer. + #[codec(index = "2")] + ViewUpdate(View), +} + +/// Information about the notifications protocol. Should be used during network configuration +/// or shortly after startup to register the protocol with the network service. +pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'static, [u8]>) { + (POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME.into()) +} + +/// An action to be carried out by the network. +#[derive(PartialEq)] +pub enum NetworkAction { + /// Note a change in reputation for a peer. + ReputationChange(PeerId, ReputationChange), + /// Write a notification to a given peer. + WriteNotification(PeerId, Vec), +} + +/// An abstraction over networking for the purposes of this subsystem. +pub trait Network: Send + 'static { + /// Get a stream of all events occurring on the network. This may include events unrelated + /// to the Polkadot protocol - the user of this function should filter only for events related + /// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID). + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; + + /// Get access to an underlying sink for all network actions. + fn action_sink<'a>(&'a mut self) -> Pin< + Box + Send + 'a> + >; + + /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. + fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange) + -> BoxFuture> + { + async move { + self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await + }.boxed() + } + + /// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic. + fn write_notification(&mut self, who: PeerId, message: Vec) + -> BoxFuture> + { + async move { + self.action_sink().send(NetworkAction::WriteNotification(who, message)).await + }.boxed() + } +} + +impl Network for Arc> { + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { + sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed() + } + + fn action_sink<'a>(&'a mut self) + -> Pin + Send + 'a>> + { + use futures::task::{Poll, Context}; + + // wrapper around a NetworkService to make it act like a sink. + struct ActionSink<'b>(&'b sc_network::NetworkService); + + impl<'b> Sink for ActionSink<'b> { + type Error = SubsystemError; + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> { + match action { + NetworkAction::ReputationChange(peer, cost_benefit) => self.0.report_peer( + peer, + cost_benefit, + ), + NetworkAction::WriteNotification(peer, message) => self.0.write_notification( + peer, + POLKADOT_ENGINE_ID, + message, + ), + } + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + } + + Box::pin(ActionSink(&**self)) + } +} + +/// The network bridge subsystem. +pub struct NetworkBridge(Option); + +impl NetworkBridge { + /// Create a new network bridge subsystem with underlying network service. + /// + /// This assumes that the network service has had the notifications protocol for the network + /// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info). + pub fn new(net_service: N) -> Self { + NetworkBridge(Some(net_service)) + } +} + +impl Subsystem for NetworkBridge + where + Net: Network, + Context: SubsystemContext, +{ + fn start(&mut self, mut ctx: Context) -> SpawnedSubsystem { + SpawnedSubsystem(match self.0.take() { + None => async move { for _ in ctx.recv().await { } }.boxed(), + Some(net) => { + // Swallow error because failure is fatal to the node and we log with more precision + // within `run_network`. + run_network(net, ctx).map(|_| ()).boxed() + } + }) + + + + } +} + +struct PeerData { + /// Latest view sent by the peer. + view: View, + /// The role of the peer. + role: ObservedRole, +} + +#[derive(Debug)] +enum Action { + RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), + SendMessage(Vec, ProtocolId, Vec), + ReportPeer(PeerId, ReputationChange), + StartWork(Hash), + StopWork(Hash), + + PeerConnected(PeerId, ObservedRole), + PeerDisconnected(PeerId), + PeerMessages(PeerId, Vec), + + Abort, +} + +fn action_from_overseer_message( + res: polkadot_subsystem::SubsystemResult>, +) -> Action { + match res { + Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) + => Action::StartWork(relay_parent), + Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))) + => Action::StopWork(relay_parent), + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, + Ok(FromOverseer::Communication { msg }) => match msg { + NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) + => Action::RegisterEventProducer(protocol_id, message_producer), + NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), + NetworkBridgeMessage::SendMessage(peers, protocol, message) + => Action::SendMessage(peers, protocol, message), + }, + Err(e) => { + log::warn!("Shutting down Network Bridge due to error {:?}", e); + Action::Abort + } + } +} + +fn action_from_network_message(event: Option) -> Option { + match event { + None => { + log::info!("Shutting down Network Bridge: underlying event stream concluded"); + Some(Action::Abort) + } + Some(NetworkEvent::Dht(_)) => None, + Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => { + if engine_id == POLKADOT_ENGINE_ID { + Some(Action::PeerConnected(remote, role)) + } else { + None + } + } + Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => { + if engine_id == POLKADOT_ENGINE_ID { + Some(Action::PeerDisconnected(remote)) + } else { + None + } + } + Some(NetworkEvent::NotificationsReceived { remote, messages }) => { + let v: Result, _> = messages.iter() + .filter(|(engine_id, _)| engine_id == &POLKADOT_ENGINE_ID) + .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .collect(); + + match v { + Err(_) => Some(Action::ReportPeer(remote, MALFORMED_MESSAGE_COST)), + Ok(v) => if v.is_empty() { + None + } else { + Some(Action::PeerMessages(remote, v)) + } + } + } + } +} + +fn construct_view(live_heads: &[Hash]) -> View { + View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect()) +} + +async fn dispatch_update_to_all( + update: NetworkBridgeEvent, + event_producers: impl IntoIterator AllMessages>, + ctx: &mut impl SubsystemContext, +) -> polkadot_subsystem::SubsystemResult<()> { + // collect messages here to avoid the borrow lasting across await boundary. + let messages: Vec<_> = event_producers.into_iter() + .map(|producer| producer(update.clone())) + .collect(); + + ctx.send_messages(messages).await +} + +async fn update_view( + peers: &HashMap, + live_heads: &[Hash], + net: &mut impl Network, + local_view: &mut View, +) -> SubsystemResult> { + let new_view = construct_view(live_heads); + if *local_view == new_view { return Ok(None) } + *local_view = new_view.clone(); + + let message = WireMessage::ViewUpdate(new_view.clone()).encode(); + + let notifications = peers.keys().cloned() + .map(move |peer| Ok(NetworkAction::WriteNotification(peer, message.clone()))); + + net.action_sink().send_all(&mut stream::iter(notifications)).await?; + + Ok(Some(NetworkBridgeEvent::OurViewChange(local_view.clone()))) +} + +async fn run_network( + mut net: N, + mut ctx: impl SubsystemContext, +) -> SubsystemResult<()> { + let mut event_stream = net.event_stream().fuse(); + + // Most recent heads are at the back. + let mut live_heads = Vec::with_capacity(MAX_VIEW_HEADS); + let mut local_view = View(Vec::new()); + + let mut peers: HashMap = HashMap::new(); + let mut event_producers = BTreeMap::new(); + + loop { + let action = { + let subsystem_next = ctx.recv().fuse(); + let mut net_event_next = event_stream.next().fuse(); + futures::pin_mut!(subsystem_next); + + let action = futures::select! { + subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)), + net_event = net_event_next => action_from_network_message(net_event), + }; + + match action { + Some(a) => a, + None => continue, + } + }; + + match action { + Action::RegisterEventProducer(protocol_id, event_producer) => { + // insert only if none present. + if let BEntry::Vacant(entry) = event_producers.entry(protocol_id) { + let event_producer = entry.insert(event_producer); + + // send the event producer information on all connected peers. + let mut messages = Vec::with_capacity(peers.len() * 2); + for (peer, data) in &peers { + messages.push(event_producer( + NetworkBridgeEvent::PeerConnected(peer.clone(), data.role.clone()) + )); + + messages.push(event_producer( + NetworkBridgeEvent::PeerViewChange(peer.clone(), data.view.clone()) + )); + } + + ctx.send_messages(messages).await?; + } + } + Action::SendMessage(peers, protocol, message) => { + let mut message_producer = stream::iter({ + let n_peers = peers.len(); + let mut message = Some( + WireMessage::ProtocolMessage(protocol, message).encode() + ); + + peers.iter().cloned().enumerate().map(move |(i, peer)| { + // optimization: avoid cloning the message for the last peer in the + // list. The message payload can be quite large. If the underlying + // network used `Bytes` this would not be necessary. + let message = if i == n_peers - 1 { + message.take() + .expect("Only taken in last iteration of loop, never afterwards; qed") + } else { + message.as_ref() + .expect("Only taken in last iteration of loop, we are not there yet; qed") + .clone() + }; + + Ok(NetworkAction::WriteNotification(peer, message)) + }) + }); + + net.action_sink().send_all(&mut message_producer).await?; + } + Action::ReportPeer(peer, rep) => { + net.report_peer(peer, rep).await?; + } + Action::StartWork(relay_parent) => { + live_heads.push(relay_parent); + if let Some(view_update) + = update_view(&peers, &live_heads, &mut net, &mut local_view).await? + { + if let Err(e) = dispatch_update_to_all( + view_update, + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return Err(e) + } + } + } + Action::StopWork(relay_parent) => { + live_heads.retain(|h| h != &relay_parent); + if let Some(view_update) + = update_view(&peers, &live_heads, &mut net, &mut local_view).await? + { + if let Err(e) = dispatch_update_to_all( + view_update, + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return Err(e) + } + } + } + + Action::PeerConnected(peer, role) => { + match peers.entry(peer.clone()) { + HEntry::Occupied(_) => continue, + HEntry::Vacant(vacant) => { + vacant.insert(PeerData { + view: View(Vec::new()), + role: role.clone(), + }); + + if let Err(e) = dispatch_update_to_all( + NetworkBridgeEvent::PeerConnected(peer, role), + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return Err(e) + } + } + } + } + Action::PeerDisconnected(peer) => { + if peers.remove(&peer).is_some() { + if let Err(e) = dispatch_update_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + event_producers.values(), + &mut ctx, + ).await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return Err(e) + } + } + }, + Action::PeerMessages(peer, messages) => { + let peer_data = match peers.get_mut(&peer) { + None => continue, + Some(d) => d, + }; + + let mut outgoing_messages = Vec::with_capacity(messages.len()); + for message in messages { + match message { + WireMessage::ViewUpdate(new_view) => { + if new_view.0.len() > MAX_VIEW_HEADS { + net.report_peer( + peer.clone(), + MALFORMED_VIEW_COST, + ).await?; + + continue + } + + if new_view == peer_data.view { continue } + peer_data.view = new_view; + + let update = NetworkBridgeEvent::PeerViewChange( + peer.clone(), + peer_data.view.clone(), + ); + + outgoing_messages.extend( + event_producers.values().map(|producer| producer(update.clone())) + ); + } + WireMessage::ProtocolMessage(protocol, message) => { + let message = match event_producers.get(&protocol) { + Some(producer) => Some(producer( + NetworkBridgeEvent::PeerMessage(peer.clone(), message) + )), + None => { + net.report_peer( + peer.clone(), + UNKNOWN_PROTO_COST, + ).await?; + + None + } + }; + + if let Some(message) = message { + outgoing_messages.push(message); + } + } + } + } + + let send_messages = ctx.send_messages(outgoing_messages); + if let Err(e) = send_messages.await { + log::warn!("Aborting - Failure to dispatch messages to overseer"); + return Err(e) + } + }, + + Action::Abort => return Ok(()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::channel::mpsc; + use futures::executor::{self, ThreadPool}; + + use std::sync::Arc; + use parking_lot::Mutex; + use assert_matches::assert_matches; + + use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage}; + use subsystem_test::{SingleItemSink, SingleItemStream}; + + // The subsystem's view of the network - only supports a single call to `event_stream`. + struct TestNetwork { + net_events: Arc>>>, + action_tx: mpsc::UnboundedSender, + } + + // The test's view of the network. This receives updates from the subsystem in the form + // of `NetworkAction`s. + struct TestNetworkHandle { + action_rx: mpsc::UnboundedReceiver, + net_tx: SingleItemSink, + } + + fn new_test_network() -> ( + TestNetwork, + TestNetworkHandle, + ) { + let (net_tx, net_rx) = subsystem_test::single_item_sink(); + let (action_tx, action_rx) = mpsc::unbounded(); + + ( + TestNetwork { + net_events: Arc::new(Mutex::new(Some(net_rx))), + action_tx, + }, + TestNetworkHandle { + action_rx, + net_tx, + }, + ) + } + + impl Network for TestNetwork { + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { + self.net_events.lock() + .take() + .expect("Subsystem made more than one call to `event_stream`") + .boxed() + } + + fn action_sink<'a>(&'a mut self) + -> Pin + Send + 'a>> + { + Box::pin((&mut self.action_tx).sink_map_err(Into::into)) + } + } + + impl TestNetworkHandle { + // Get the next network action. + async fn next_network_action(&mut self) -> NetworkAction { + self.action_rx.next().await.expect("subsystem concluded early") + } + + // Wait for the next N network actions. + async fn next_network_actions(&mut self, n: usize) -> Vec { + let mut v = Vec::with_capacity(n); + for _ in 0..n { + v.push(self.next_network_action().await); + } + + v + } + + async fn connect_peer(&mut self, peer: PeerId, role: ObservedRole) { + self.send_network_event(NetworkEvent::NotificationStreamOpened { + remote: peer, + engine_id: POLKADOT_ENGINE_ID, + role, + }).await; + } + + async fn disconnect_peer(&mut self, peer: PeerId) { + self.send_network_event(NetworkEvent::NotificationStreamClosed { + remote: peer, + engine_id: POLKADOT_ENGINE_ID, + }).await; + } + + async fn peer_message(&mut self, peer: PeerId, message: Vec) { + self.send_network_event(NetworkEvent::NotificationsReceived { + remote: peer, + messages: vec![(POLKADOT_ENGINE_ID, message.into())], + }).await; + } + + async fn send_network_event(&mut self, event: NetworkEvent) { + self.net_tx.send(event).await.expect("subsystem concluded early"); + } + } + + // network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so + // we need to use this to prevent fragile reliance on peer ordering. + fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool { + actions.iter().find(|&x| x == action).is_some() + } + + struct TestHarness { + network_handle: TestNetworkHandle, + virtual_overseer: subsystem_test::TestSubsystemContextHandle, + } + + fn test_harness>(test: impl FnOnce(TestHarness) -> T) { + let pool = ThreadPool::new().unwrap(); + + let (network, network_handle) = new_test_network(); + let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool); + + let network_bridge = run_network( + network, + context, + ) + .map_err(|_| panic!("subsystem execution failed")) + .map(|_| ()); + + let test_fut = test(TestHarness { + network_handle, + virtual_overseer, + }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(network_bridge); + + executor::block_on(future::select(test_fut, network_bridge)); + } + + #[test] + fn sends_view_updates_to_peers() { + test_harness(|test_harness| async move { + let TestHarness { mut network_handle, mut virtual_overseer } = test_harness; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + network_handle.connect_peer(peer_a.clone(), ObservedRole::Full).await; + network_handle.connect_peer(peer_b.clone(), ObservedRole::Full).await; + + let hash_a = Hash::from([1; 32]); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await; + + let actions = network_handle.next_network_actions(2).await; + let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode(); + assert!(network_actions_contains( + &actions, + &NetworkAction::WriteNotification(peer_a, wire_message.clone()), + )); + + assert!(network_actions_contains( + &actions, + &NetworkAction::WriteNotification(peer_b, wire_message.clone()), + )); + }); + } + + #[test] + fn peer_view_updates_sent_via_overseer() { + test_harness(|test_harness| async move { + let TestHarness { + mut network_handle, + mut virtual_overseer, + } = test_harness; + + let peer = PeerId::random(); + + let proto_statement = *b"abcd"; + let proto_bitfield = *b"wxyz"; + + network_handle.connect_peer(peer.clone(), ObservedRole::Full).await; + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_statement, + |event| AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }).await; + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_bitfield, + |event| AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }).await; + + let view = View(vec![Hash::from([1u8; 32])]); + + // bridge will inform about all previously-connected peers. + { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + } + + network_handle.peer_message( + peer.clone(), + WireMessage::ViewUpdate(view.clone()).encode(), + ).await; + + // statement distribution message comes first because handlers are ordered by + // protocol ID. + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) => { + assert_eq!(p, peer); + assert_eq!(v, view); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) => { + assert_eq!(p, peer); + assert_eq!(v, view); + } + ); + }); + } + + #[test] + fn peer_messages_sent_via_overseer() { + test_harness(|test_harness| async move { + let TestHarness { + mut network_handle, + mut virtual_overseer, + } = test_harness; + + let peer = PeerId::random(); + + let proto_statement = *b"abcd"; + let proto_bitfield = *b"wxyz"; + + network_handle.connect_peer(peer.clone(), ObservedRole::Full).await; + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_statement, + |event| AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }).await; + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::RegisterEventProducer( + proto_bitfield, + |event| AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate(event) + ) + ), + }).await; + + // bridge will inform about all previously-connected peers. + { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) + ) + ) if p == peer + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(p, v) + ) + ) if p == peer && v == View(Default::default()) + ); + } + + let payload = vec![1, 2, 3]; + + network_handle.peer_message( + peer.clone(), + WireMessage::ProtocolMessage(proto_statement, payload.clone()).encode(), + ).await; + + network_handle.disconnect_peer(peer.clone()).await; + + // statement distribution message comes first because handlers are ordered by + // protocol ID, and then a disconnection event comes - indicating that the message + // was only sent to the correct protocol. + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(p, m) + ) + ) => { + assert_eq!(p, peer); + assert_eq!(m, payload); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerDisconnected(p) + ) + ) => { + assert_eq!(p, peer); + } + ); + }); + } +} diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index 88626e2e05f3..6c6ce304e6d4 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -11,7 +11,8 @@ futures-timer = "3.0.2" streamunordered = "0.5.1" polkadot-primitives = { path = "../../primitives" } client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } -messages = { package = "polkadot-node-messages", path = "../messages" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } +async-trait = "0.1" [dev-dependencies] futures = { version = "0.3.5", features = ["thread-pool"] } diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 77b99a3a3b3f..0edc87a6b8db 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -28,16 +28,17 @@ use futures_timer::Delay; use kv_log_macro as log; use polkadot_primitives::parachain::{BlockData, PoVBlock}; -use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem}; +use polkadot_overseer::Overseer; -use messages::{ - AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage +use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer}; +use polkadot_subsystem::messages::{ + AllMessages, CandidateBackingMessage, CandidateValidationMessage }; struct Subsystem1; impl Subsystem1 { - async fn run(mut ctx: SubsystemContext) { + async fn run(mut ctx: impl SubsystemContext) { loop { match ctx.try_recv().await { Ok(Some(msg)) => { @@ -56,7 +57,7 @@ impl Subsystem1 { Delay::new(Duration::from_secs(1)).await; let (tx, _) = oneshot::channel(); - ctx.send_msg(AllMessages::CandidateValidation( + ctx.send_message(AllMessages::CandidateValidation( CandidateValidationMessage::Validate( Default::default(), Default::default(), @@ -70,8 +71,10 @@ impl Subsystem1 { } } -impl Subsystem for Subsystem1 { - fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for Subsystem1 + where C: SubsystemContext +{ + fn start(&mut self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; })) @@ -81,7 +84,7 @@ impl Subsystem for Subsystem1 { struct Subsystem2; impl Subsystem2 { - async fn run(mut ctx: SubsystemContext) { + async fn run(mut ctx: impl SubsystemContext) { ctx.spawn(Box::pin(async { loop { log::info!("Job tick"); @@ -105,8 +108,10 @@ impl Subsystem2 { } } -impl Subsystem for Subsystem2 { - fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for Subsystem2 + where C: SubsystemContext +{ + fn start(&mut self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; })) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 0d3c9b7b5095..8fb8706be429 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -65,8 +65,8 @@ use futures::channel::{mpsc, oneshot}; use futures::{ pending, poll, select, future::{BoxFuture, RemoteHandle}, - stream::FuturesUnordered, - task::{Spawn, SpawnError, SpawnExt}, + stream::{self, FuturesUnordered}, + task::{Spawn, SpawnExt}, Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; @@ -75,50 +75,14 @@ use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; -pub use messages::{ - OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages, - FromOverseer, +use polkadot_subsystem::messages::{ + CandidateValidationMessage, CandidateBackingMessage, AllMessages +}; +pub use polkadot_subsystem::{ + Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, + SpawnedSubsystem, }; -/// An error type that describes faults that may happen -/// -/// These are: -/// * Channels being closed -/// * Subsystems dying when they are not expected to -/// * Subsystems not dying when they are told to die -/// * etc. -#[derive(Debug)] -pub struct SubsystemError; - -impl From for SubsystemError { - fn from(_: mpsc::SendError) -> Self { - Self - } -} - -impl From for SubsystemError { - fn from(_: oneshot::Canceled) -> Self { - Self - } -} - -impl From for SubsystemError { - fn from(_: SpawnError) -> Self { - Self - } -} - -/// A `Result` type that wraps [`SubsystemError`]. -/// -/// [`SubsystemError`]: struct.SubsystemError.html -pub type SubsystemResult = Result; - -/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`]. -/// -/// In essence it's just a newtype wrapping a `BoxFuture`. -/// -/// [`Overseer`]: struct.Overseer.html -pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>); // A capacity of bounded channels inside the overseer. const CHANNEL_CAPACITY: usize = 1024; @@ -278,7 +242,7 @@ impl Debug for ToOverseer { /// A running instance of some [`Subsystem`]. /// /// [`Subsystem`]: trait.Subsystem.html -struct SubsystemInstance { +struct SubsystemInstance { tx: mpsc::Sender>, } @@ -289,17 +253,17 @@ struct SubsystemInstance { /// [`Overseer`]: struct.Overseer.html /// [`Subsystem`]: trait.Subsystem.html /// [`SubsystemJob`]: trait.SubsystemJob.html -pub struct SubsystemContext{ +#[derive(Debug)] +pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, tx: mpsc::Sender, } -impl SubsystemContext { - /// Try to asyncronously receive a message. - /// - /// This has to be used with caution, if you loop over this without - /// using `pending!()` macro you will end up with a busy loop! - pub async fn try_recv(&mut self) -> Result>, ()> { +#[async_trait::async_trait] +impl SubsystemContext for OverseerSubsystemContext { + type Message = M; + + async fn try_recv(&mut self) -> Result>, ()> { match poll!(self.rx.next()) { Poll::Ready(Some(msg)) => Ok(Some(msg)), Poll::Ready(None) => Err(()), @@ -307,13 +271,11 @@ impl SubsystemContext { } } - /// Receive a message. - pub async fn recv(&mut self) -> SubsystemResult> { + async fn recv(&mut self) -> SubsystemResult> { self.rx.next().await.ok_or(SubsystemError) } - /// Spawn a child task on the executor. - pub async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { + async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { let (tx, rx) = oneshot::channel(); self.tx.send(ToOverseer::SpawnJob { s, @@ -323,33 +285,25 @@ impl SubsystemContext { rx.await? } - /// Send a direct message to some other `Subsystem`, routed based on message type. - pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { self.tx.send(ToOverseer::SubsystemMessage(msg)).await?; Ok(()) } - fn new(rx: mpsc::Receiver>, tx: mpsc::Sender) -> Self { - Self { - rx, - tx, - } + async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + where T: IntoIterator + Send, T::IntoIter: Send + { + let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); + self.tx.send_all(&mut msgs).await?; + + Ok(()) } } -/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. -/// -/// It is generic over the message type circulating in the system. -/// The idea that we want some type contaning persistent state that -/// can spawn actually running subsystems when asked to. -/// -/// [`Overseer`]: struct.Overseer.html -/// [`Subsystem`]: trait.Subsystem.html -pub trait Subsystem { - /// Start this `Subsystem` and return `SpawnedSubsystem`. - fn start(&mut self, ctx: SubsystemContext) -> SpawnedSubsystem; -} +/// A subsystem compatible with the overseer - one which can be run in the context of the +/// overseer. +pub type CompatibleSubsystem = Box> + Send>; /// A subsystem that we oversee. /// @@ -359,8 +313,8 @@ pub trait Subsystem { /// /// [`Subsystem`]: trait.Subsystem.html #[allow(dead_code)] -struct OverseenSubsystem { - subsystem: Box + Send>, +struct OverseenSubsystem { + subsystem: CompatibleSubsystem, instance: Option>, } @@ -441,16 +395,20 @@ where /// # use std::time::Duration; /// # use futures::{executor, pin_mut, select, FutureExt}; /// # use futures_timer::Delay; - /// # use polkadot_overseer::{ - /// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext, - /// # CandidateValidationMessage, CandidateBackingMessage, + /// # use polkadot_overseer::Overseer; + /// # use polkadot_subsystem::{ + /// # Subsystem, SpawnedSubsystem, SubsystemContext, + /// # messages::{CandidateValidationMessage, CandidateBackingMessage}, /// # }; /// /// struct ValidationSubsystem; - /// impl Subsystem for ValidationSubsystem { + /// + /// impl Subsystem for ValidationSubsystem + /// where C: SubsystemContext + /// { /// fn start( /// &mut self, - /// mut ctx: SubsystemContext, + /// mut ctx: C, /// ) -> SpawnedSubsystem { /// SpawnedSubsystem(Box::pin(async move { /// loop { @@ -461,10 +419,12 @@ where /// } /// /// struct CandidateBackingSubsystem; - /// impl Subsystem for CandidateBackingSubsystem { + /// impl Subsystem for CandidateBackingSubsystem + /// where C: SubsystemContext + /// { /// fn start( /// &mut self, - /// mut ctx: SubsystemContext, + /// mut ctx: C, /// ) -> SpawnedSubsystem { /// SpawnedSubsystem(Box::pin(async move { /// loop { @@ -498,8 +458,8 @@ where /// ``` pub fn new( leaves: impl IntoIterator, - validation: Box + Send>, - candidate_backing: Box + Send>, + validation: CompatibleSubsystem, + candidate_backing: CompatibleSubsystem, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -680,6 +640,12 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + _ => { + // TODO: temporary catch-all until all subsystems are integrated with overseer. + // The overseer is not complete until this is an exhaustive match with all + // messages targeting an included subsystem. + // https://github.com/paritytech/polkadot/issues/1317 + } } } @@ -688,15 +654,15 @@ where } } -fn spawn( +fn spawn( spawner: &mut S, futures: &mut FuturesUnordered>, streams: &mut StreamUnordered>, - mut s: Box + Send>, + mut s: CompatibleSubsystem, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); - let ctx = SubsystemContext::new(to_rx, from_tx); + let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx }; let f = s.start(ctx); let handle = spawner.spawn_with_handle(f.0)?; @@ -723,8 +689,10 @@ mod tests { struct TestSubsystem1(mpsc::Sender); - impl Subsystem for TestSubsystem1 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem1 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { let mut i = 0; @@ -746,14 +714,16 @@ mod tests { struct TestSubsystem2(mpsc::Sender); - impl Subsystem for TestSubsystem2 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem2 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { let mut c: usize = 0; loop { if c < 10 { let (tx, _) = oneshot::channel(); - ctx.send_msg( + ctx.send_message( AllMessages::CandidateValidation( CandidateValidationMessage::Validate( Default::default(), @@ -786,8 +756,10 @@ mod tests { struct TestSubsystem4; - impl Subsystem for TestSubsystem4 { - fn start(&mut self, mut _ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem4 + where C: SubsystemContext + { + fn start(&mut self, mut _ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { // Do nothing and exit. })) @@ -871,8 +843,10 @@ mod tests { struct TestSubsystem5(mpsc::Sender); - impl Subsystem for TestSubsystem5 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem5 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { @@ -895,8 +869,10 @@ mod tests { struct TestSubsystem6(mpsc::Sender); - impl Subsystem for TestSubsystem6 { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + impl Subsystem for TestSubsystem6 + where C: SubsystemContext + { + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { diff --git a/node/primitives/Cargo.toml b/node/primitives/Cargo.toml index f317565b2e99..b2bc9231ae74 100644 --- a/node/primitives/Cargo.toml +++ b/node/primitives/Cargo.toml @@ -10,3 +10,4 @@ polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } parity-scale-codec = { version = "1.3.0", default-features = false, features = ["derive"] } runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +async-trait = "0.1" diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index bd43748ab24a..527e6aaea274 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -64,6 +64,7 @@ impl EncodeAs for Statement { pub type SignedFullStatement = Signed; /// A misbehaviour report. +#[derive(Debug)] pub enum MisbehaviorReport { /// These validator nodes disagree on this candidate's validity, please figure it out /// @@ -79,3 +80,12 @@ pub enum MisbehaviorReport { /// This peer has seconded more than one parachain candidate for this relay parent head DoubleVote(CandidateReceipt, SignedFullStatement, SignedFullStatement), } + +/// A unique identifier for a network protocol. +pub type ProtocolId = [u8; 4]; + +/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads. +/// +/// Up to `N` (5?) chain heads. +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct View(pub Vec); diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 74069f0233af..f1a56acfad95 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -15,6 +15,7 @@ hex-literal = "0.2.1" polkadot-primitives = { path = "../../primitives" } polkadot-runtime = { path = "../../runtime/polkadot" } polkadot-overseer = { path = "../overseer" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } kusama-runtime = { path = "../../runtime/kusama" } westend-runtime = { path = "../../runtime/westend" } polkadot-network = { path = "../../network", optional = true } diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 620850b3bd64..9b917aba1b7a 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -29,10 +29,10 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use sc_executor::native_executor_instance; use log::info; use sp_blockchain::HeaderBackend; -use polkadot_overseer::{ - self as overseer, - BlockInfo, Overseer, OverseerHandler, Subsystem, SubsystemContext, SpawnedSubsystem, - CandidateValidationMessage, CandidateBackingMessage, +use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler}; +use polkadot_subsystem::{ + Subsystem, SubsystemContext, SpawnedSubsystem, + messages::{CandidateValidationMessage, CandidateBackingMessage}, }; pub use service::{ AbstractService, Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis, @@ -269,8 +269,10 @@ macro_rules! new_full_start { struct CandidateValidationSubsystem; -impl Subsystem for CandidateValidationSubsystem { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for CandidateValidationSubsystem + where C: SubsystemContext +{ + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { while let Ok(_) = ctx.recv().await {} })) @@ -279,8 +281,10 @@ impl Subsystem for CandidateValidationSubsystem { struct CandidateBackingSubsystem; -impl Subsystem for CandidateBackingSubsystem { - fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { +impl Subsystem for CandidateBackingSubsystem + where C: SubsystemContext +{ + fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { while let Ok(_) = ctx.recv().await {} })) diff --git a/node/messages/Cargo.toml b/node/subsystem/Cargo.toml similarity index 77% rename from node/messages/Cargo.toml rename to node/subsystem/Cargo.toml index 9edb5a051987..43712319cb71 100644 --- a/node/messages/Cargo.toml +++ b/node/subsystem/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "polkadot-node-messages" +name = "polkadot-node-subsystem" version = "0.1.0" authors = ["Parity Technologies "] edition = "2018" -description = "Message types used by Subsystems" +description = "Subsystem traits and message definitions" [dependencies] polkadot-primitives = { path = "../../primitives" } @@ -11,3 +11,4 @@ polkadot-statement-table = { path = "../../statement-table" } polkadot-node-primitives = { path = "../primitives" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.5" +async-trait = "0.1" diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs new file mode 100644 index 000000000000..31d094907f51 --- /dev/null +++ b/node/subsystem/src/lib.rs @@ -0,0 +1,150 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Subsystem trait definitions and message types. +//! +//! Node-side logic for Polkadot is mostly comprised of Subsystems, which are discrete components +//! that communicate via message-passing. They are coordinated by an overseer, provided by a +//! separate crate. + +use std::pin::Pin; + +use futures::prelude::*; +use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; + +use polkadot_primitives::Hash; +use async_trait::async_trait; + +use crate::messages::AllMessages; + +pub mod messages; + +/// Signals sent by an overseer to a subsystem. +#[derive(PartialEq, Clone, Debug)] +pub enum OverseerSignal { + /// `Subsystem` should start working on block-based work, given by the relay-chain block hash. + StartWork(Hash), + /// `Subsystem` should stop working on block-based work specified by the relay-chain block hash. + StopWork(Hash), + /// Conclude the work of the `Overseer` and all `Subsystem`s. + Conclude, +} + +/// A message type that a subsystem receives from an overseer. +/// It wraps signals from an overseer and messages that are circulating +/// between subsystems. +/// +/// It is generic over over the message type `M` that a particular `Subsystem` may use. +#[derive(Debug)] +pub enum FromOverseer { + /// Signal from the `Overseer`. + Signal(OverseerSignal), + + /// Some other `Subsystem`'s message. + Communication { + msg: M, + }, +} + +/// An error type that describes faults that may happen +/// +/// These are: +/// * Channels being closed +/// * Subsystems dying when they are not expected to +/// * Subsystems not dying when they are told to die +/// * etc. +#[derive(Debug)] +pub struct SubsystemError; + +impl From for SubsystemError { + fn from(_: mpsc::SendError) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(_: oneshot::Canceled) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(_: futures::task::SpawnError) -> Self { + Self + } +} + +impl From for SubsystemError { + fn from(e: std::convert::Infallible) -> Self { + match e {} + } +} + +/// An asynchronous subsystem task.. +/// +/// In essence it's just a newtype wrapping a `BoxFuture`. +pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>); + +/// A `Result` type that wraps [`SubsystemError`]. +/// +/// [`SubsystemError`]: struct.SubsystemError.html +pub type SubsystemResult = Result; + +/// A context type that is given to the [`Subsystem`] upon spawning. +/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s +/// or spawn jobs. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`SubsystemJob`]: trait.SubsystemJob.html +#[async_trait] +pub trait SubsystemContext: Send + 'static { + /// The message type of this context. Subsystems launched with this context will expect + /// to receive messages of this type. + type Message: Send; + + /// Try to asynchronously receive a message. + /// + /// This has to be used with caution, if you loop over this without + /// using `pending!()` macro you will end up with a busy loop! + async fn try_recv(&mut self) -> Result>, ()>; + + /// Receive a message. + async fn recv(&mut self) -> SubsystemResult>; + + /// Spawn a child task on the executor. + async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()>; + + /// Send a direct message to some other `Subsystem`, routed based on message type. + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>; + + /// Send multiple direct messages to other `Subsystem`s, routed based on message type. + async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + where T: IntoIterator + Send, T::IntoIter: Send; +} + +/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. +/// +/// It is generic over the message type circulating in the system. +/// The idea that we want some type contaning persistent state that +/// can spawn actually running subsystems when asked to. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +pub trait Subsystem { + /// Start this `Subsystem` and return `SpawnedSubsystem`. + fn start(&mut self, ctx: C) -> SpawnedSubsystem; +} diff --git a/node/messages/src/lib.rs b/node/subsystem/src/messages.rs similarity index 87% rename from node/messages/src/lib.rs rename to node/subsystem/src/messages.rs index 3a413f2c67bb..c22581349078 100644 --- a/node/messages/src/lib.rs +++ b/node/subsystem/src/messages.rs @@ -24,27 +24,16 @@ use futures::channel::{mpsc, oneshot}; -use sc_network::{ObservedRole, ReputationChange, PeerId, config::ProtocolId}; +use sc_network::{ObservedRole, ReputationChange, PeerId}; use polkadot_primitives::{BlockNumber, Hash, Signature}; use polkadot_primitives::parachain::{ AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId, SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, }; use polkadot_node_primitives::{ - MisbehaviorReport, SignedFullStatement, + MisbehaviorReport, SignedFullStatement, View, ProtocolId, }; -/// Signals sent by an overseer to a subsystem. -#[derive(PartialEq, Clone, Debug)] -pub enum OverseerSignal { - /// `Subsystem` should start working on block-based work, given by the relay-chain block hash. - StartWork(Hash), - /// `Subsystem` should stop working on block-based work specified by the relay-chain block hash. - StopWork(Hash), - /// Conclude the work of the `Overseer` and all `Subsystem`s. - Conclude, -} - /// A notification of a new backed candidate. #[derive(Debug)] pub struct NewBackedCandidate(pub BackedCandidate); @@ -90,12 +79,8 @@ pub enum CandidateValidationMessage { ), } -/// Chain heads. -/// -/// Up to `N` (5?) chain heads. -pub struct View(pub Vec); - /// Events from network. +#[derive(Debug, Clone)] pub enum NetworkBridgeEvent { /// A peer has connected. PeerConnected(PeerId, ObservedRole), @@ -114,7 +99,8 @@ pub enum NetworkBridgeEvent { } /// Messages received by the network bridge subsystem. -pub enum NetworkBridgeSubsystemMessage { +#[derive(Debug)] +pub enum NetworkBridgeMessage { /// Register an event producer on startup. RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), @@ -126,6 +112,7 @@ pub enum NetworkBridgeSubsystemMessage { } /// Availability Distribution Message. +#[derive(Debug)] pub enum AvailabilityDistributionMessage { /// Distribute an availability chunk to other validators. DistributeChunk(Hash, ErasureChunk), @@ -138,6 +125,7 @@ pub enum AvailabilityDistributionMessage { } /// Bitfield distribution message. +#[derive(Debug)] pub enum BitfieldDistributionMessage { /// Distribute a bitfield via gossip to other validators. DistributeBitfield(Hash, SignedAvailabilityBitfield), @@ -147,6 +135,7 @@ pub enum BitfieldDistributionMessage { } /// Availability store subsystem message. +#[derive(Debug)] pub enum AvailabilityStoreMessage { /// Query a `PoVBlock` from the AV store. QueryPoV(Hash, oneshot::Sender>), @@ -159,6 +148,7 @@ pub enum AvailabilityStoreMessage { } /// A request to the Runtime API subsystem. +#[derive(Debug)] pub enum RuntimeApiRequest { /// Get the current validator set. Validators(oneshot::Sender>), @@ -171,19 +161,24 @@ pub enum RuntimeApiRequest { } /// A message to the Runtime API subsystem. +#[derive(Debug)] pub enum RuntimeApiMessage { /// Make a request of the runtime API against the post-state of the given relay-parent. Request(Hash, RuntimeApiRequest), } /// Statement distribution message. +#[derive(Debug)] pub enum StatementDistributionMessage { /// We have originated a signed statement in the context of /// given relay-parent hash and it should be distributed to other validators. Share(Hash, SignedFullStatement), + /// Event from the network bridge. + NetworkBridgeUpdate(NetworkBridgeEvent), } /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block. +#[derive(Debug)] pub enum ProvisionableData { /// This bitfield indicates the availability of various candidate blocks. Bitfield(Hash, SignedAvailabilityBitfield), @@ -198,6 +193,7 @@ pub enum ProvisionableData { /// Message to the Provisioner. /// /// In all cases, the Hash is that of the relay parent. +#[derive(Debug)] pub enum ProvisionerMessage { /// This message allows potential block authors to be kept updated with all new authorship data /// as it becomes available. @@ -213,20 +209,18 @@ pub enum AllMessages { CandidateValidation(CandidateValidationMessage), /// Message for the candidate backing subsystem. CandidateBacking(CandidateBackingMessage), -} - -/// A message type that a subsystem receives from an overseer. -/// It wraps signals from an overseer and messages that are circulating -/// between subsystems. -/// -/// It is generic over over the message type `M` that a particular `Subsystem` may use. -#[derive(Debug)] -pub enum FromOverseer { - /// Signal from the `Overseer`. - Signal(OverseerSignal), - - /// Some other `Subsystem`'s message. - Communication { - msg: M, - }, + /// Message for the candidate selection subsystem. + CandidateSelection(CandidateSelectionMessage), + /// Message for the statement distribution subsystem. + StatementDistribution(StatementDistributionMessage), + /// Message for the availability distribution subsystem. + AvailabilityDistribution(AvailabilityDistributionMessage), + /// Message for the bitfield distribution subsystem. + BitfieldDistribution(BitfieldDistributionMessage), + /// Message for the Provisioner subsystem. + Provisioner(ProvisionerMessage), + /// Message for the Runtime API subsystem. + RuntimeApi(RuntimeApiMessage), + /// Message for the availability store subsystem. + AvailabilityStore(AvailabilityStoreMessage), } diff --git a/node/test-helpers/subsystem/Cargo.toml b/node/test-helpers/subsystem/Cargo.toml new file mode 100644 index 000000000000..0fc26a24ea14 --- /dev/null +++ b/node/test-helpers/subsystem/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "polkadot-subsystem-test-helpers" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" +description = "Helpers for testing subsystems" + +[dependencies] +futures = "0.3.5" +async-trait = "0.1" +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +parking_lot = "0.10.0" diff --git a/node/test-helpers/subsystem/src/lib.rs b/node/test-helpers/subsystem/src/lib.rs new file mode 100644 index 000000000000..c99a33c78d9b --- /dev/null +++ b/node/test-helpers/subsystem/src/lib.rs @@ -0,0 +1,229 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Utilities for testing subsystems. + +use polkadot_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError}; +use polkadot_subsystem::messages::AllMessages; + +use futures::prelude::*; +use futures::channel::mpsc; +use futures::task::{Spawn, SpawnExt}; +use futures::poll; +use parking_lot::Mutex; + +use std::convert::Infallible; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +enum SinkState { + Empty { + read_waker: Option, + }, + Item { + item: T, + ready_waker: Option, + flush_waker: Option, + }, +} + +/// The sink half of a single-item sink that does not resolve until the item has been read. +pub struct SingleItemSink(Arc>>); + +/// The stream half of a single-item sink. +pub struct SingleItemStream(Arc>>); + +impl Sink for SingleItemSink { + type Error = Infallible; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + let mut state = self.0.lock(); + match *state { + SinkState::Empty { .. } => Poll::Ready(Ok(())), + SinkState::Item { ref mut ready_waker, .. } => { + *ready_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } + + fn start_send( + self: Pin<&mut Self>, + item: T, + ) -> Result<(), Infallible> { + let mut state = self.0.lock(); + + match *state { + SinkState::Empty { ref mut read_waker } => { + if let Some(waker) = read_waker.take() { + waker.wake(); + } + } + _ => panic!("start_send called outside of empty sink state ensured by poll_ready"), + } + + *state = SinkState::Item { + item, + ready_waker: None, + flush_waker: None, + }; + + Ok(()) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + let mut state = self.0.lock(); + match *state { + SinkState::Empty { .. } => Poll::Ready(Ok(())), + SinkState::Item { ref mut flush_waker, .. } => { + *flush_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + self.poll_flush(cx) + } +} + +impl Stream for SingleItemStream { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut state = self.0.lock(); + + let read_waker = Some(cx.waker().clone()); + + match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) { + SinkState::Empty { .. } => Poll::Pending, + SinkState::Item { item, ready_waker, flush_waker } => { + if let Some(waker) = ready_waker { + waker.wake(); + } + + if let Some(waker) = flush_waker { + waker.wake(); + } + + Poll::Ready(Some(item)) + } + } + } +} + +/// Create a single-item Sink/Stream pair. +/// +/// The sink's send methods resolve at the point which the stream reads the item, +/// not when the item is buffered. +pub fn single_item_sink() -> (SingleItemSink, SingleItemStream) { + let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None })); + ( + SingleItemSink(inner.clone()), + SingleItemStream(inner), + ) +} + +/// A test subsystem context. +pub struct TestSubsystemContext { + tx: mpsc::UnboundedSender, + rx: SingleItemStream>, + spawn: S, +} + +#[async_trait::async_trait] +impl SubsystemContext for TestSubsystemContext { + type Message = M; + + async fn try_recv(&mut self) -> Result>, ()> { + match poll!(self.rx.next()) { + Poll::Ready(Some(msg)) => Ok(Some(msg)), + Poll::Ready(None) => Err(()), + Poll::Pending => Ok(None), + } + } + + async fn recv(&mut self) -> SubsystemResult> { + self.rx.next().await.ok_or(SubsystemError) + } + + async fn spawn(&mut self, s: Pin + Send>>) -> SubsystemResult<()> { + self.spawn.spawn(s).map_err(Into::into) + } + + async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { + self.tx.send(msg).await.expect("test overseer no longer live"); + Ok(()) + } + + async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> + where T: IntoIterator + Send, T::IntoIter: Send + { + let mut iter = stream::iter(msgs.into_iter().map(Ok)); + self.tx.send_all(&mut iter).await.expect("test overseer no longer live"); + + Ok(()) + } +} + +/// A handle for interacting with the subsystem context. +pub struct TestSubsystemContextHandle { + tx: SingleItemSink>, + rx: mpsc::UnboundedReceiver, +} + +impl TestSubsystemContextHandle { + /// Send a message or signal to the subsystem. This resolves at the point in time where the + /// subsystem has _read_ the message. + pub async fn send(&mut self, from_overseer: FromOverseer) { + self.tx.send(from_overseer).await.expect("Test subsystem no longer live"); + } + + /// Receive the next message from the subsystem. + pub async fn recv(&mut self) -> AllMessages { + self.rx.next().await.expect("Test subsystem no longer live") + } +} + +/// Make a test subsystem context. +pub fn make_subsystem_context(spawn: S) + -> (TestSubsystemContext, TestSubsystemContextHandle) +{ + let (overseer_tx, overseer_rx) = single_item_sink(); + let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); + + ( + TestSubsystemContext { + tx: all_messages_tx, + rx: overseer_rx, + spawn, + }, + TestSubsystemContextHandle { + tx: overseer_tx, + rx: all_messages_rx + }, + ) +}