diff --git a/Cargo.lock b/Cargo.lock index f11fdc9486..d74d611e88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,6 +786,7 @@ dependencies = [ name = "cirrus-client-executor" version = "0.1.0" dependencies = [ + "cirrus-client-executor-gossip", "cirrus-node-primitives", "cirrus-primitives", "cumulus-client-consensus-common", @@ -799,6 +800,7 @@ dependencies = [ "rand_chacha 0.3.1", "sc-client-api", "sc-transaction-pool-api", + "sc-utils", "sp-api", "sp-blockchain", "sp-consensus", @@ -812,11 +814,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "cirrus-client-executor-gossip" +version = "0.1.0" +dependencies = [ + "futures 0.3.17", + "parity-scale-codec", + "parking_lot", + "sc-network", + "sc-network-gossip", + "sc-utils", + "sp-core", + "sp-executor", + "sp-runtime", + "tracing", +] + [[package]] name = "cirrus-client-service" version = "0.1.0" dependencies = [ "cirrus-client-executor", + "cirrus-client-executor-gossip", "cirrus-node-primitives", "cirrus-primitives", "cumulus-client-consensus-common", @@ -825,10 +844,12 @@ dependencies = [ "sc-chain-spec", "sc-client-api", "sc-consensus", + "sc-network", "sc-service", "sc-telemetry", "sc-tracing", "sc-transaction-pool-api", + "sc-utils", "sp-api", "sp-blockchain", "sp-consensus", @@ -4552,6 +4573,7 @@ name = "parachain-template-node" version = "0.1.0" dependencies = [ "cirrus-client-executor", + "cirrus-client-executor-gossip", "cirrus-client-service", "cirrus-primitives", "cumulus-client-cli", @@ -6326,6 +6348,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "sc-network-gossip" +version = "0.10.0-dev" +source = "git+https://github.com/paritytech/substrate?rev=5bd5b842d4ea520d281b1398e1f54907c9862fcd#5bd5b842d4ea520d281b1398e1f54907c9862fcd" +dependencies = [ + "futures 0.3.17", + "futures-timer 3.0.2", + "libp2p", + "log", + "lru 0.7.0", + "sc-network", + "sp-runtime", + "substrate-prometheus-endpoint", + "tracing", +] + [[package]] name = "sc-network-test" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 23a6fcff9c..f3d7080fad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "cumulus/client/cirrus-executor", "cumulus/client/consensus/common", "cumulus/client/consensus/relay-chain", + "cumulus/client/executor-gossip", "cumulus/parachain-template/node", "cumulus/parachain-template/runtime", "cumulus/primitives", diff --git a/crates/cirrus-node-primitives/src/lib.rs b/crates/cirrus-node-primitives/src/lib.rs index 45dc0fb5b4..4dcb3c8395 100644 --- a/crates/cirrus-node-primitives/src/lib.rs +++ b/crates/cirrus-node-primitives/src/lib.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use sp_application_crypto::KeyTypeId; use sp_consensus_slots::Slot; use sp_core::bytes; -use sp_executor::{ExecutionReceipt, OpaqueBundle}; +use sp_executor::{OpaqueBundle, OpaqueExecutionReceipt}; use sp_runtime::traits::Hash as HashT; use std::pin::Pin; use subspace_core_primitives::{Randomness, Tag}; @@ -94,14 +94,14 @@ impl BundleResult { } /// Result of the [`ProcessorFn`] invocation. -pub struct ProcessorResult { - /// The execution receipt that was built. - pub execution_receipt: ExecutionReceipt, +pub struct ProcessorResult { + /// The opaque execution receipt that was built. + pub opaque_execution_receipt: OpaqueExecutionReceipt, } impl ProcessorResult { - pub fn to_execution_receipt(self) -> ExecutionReceipt { - self.execution_receipt + pub fn to_opaque_execution_receipt(self) -> OpaqueExecutionReceipt { + self.opaque_execution_receipt } } diff --git a/crates/pallet-executor/src/lib.rs b/crates/pallet-executor/src/lib.rs index a8b8dd92b3..c4f0fdf37f 100644 --- a/crates/pallet-executor/src/lib.rs +++ b/crates/pallet-executor/src/lib.rs @@ -58,7 +58,7 @@ mod pallet { head_hash: T::Hash, }, /// A new candidate receipt was backed. - ExecutionReceiptStored { receipt_hash: T::Hash }, + ExecutionReceiptStored { receipt_hash: H256 }, /// A transaction bundle was included. TransactionBundleStored { bundle_hash: H256 }, /// A fraud proof was processed. @@ -224,6 +224,7 @@ mod pallet { .build() } Call::submit_fraud_proof { fraud_proof } => { + // TODO: prevent the spamming of fraud proof transaction. if let Err(e) = Self::check_fraud_proof(fraud_proof) { log::error!( target: "runtime::subspace::executor", diff --git a/crates/sp-executor/src/lib.rs b/crates/sp-executor/src/lib.rs index 507409ce11..d2a3f0b647 100644 --- a/crates/sp-executor/src/lib.rs +++ b/crates/sp-executor/src/lib.rs @@ -95,7 +95,7 @@ impl From> #[derive(Decode, Encode, TypeInfo, PartialEq, Eq, Clone, RuntimeDebug)] pub struct ExecutionReceipt { /// Primary block hash. - pub primary_hash: Hash, + pub primary_hash: H256, /// Secondary block hash? pub secondary_hash: Hash, /// State root after finishing the execution. @@ -104,10 +104,21 @@ pub struct ExecutionReceipt { pub state_transition_root: Hash, } -impl ExecutionReceipt { - /// TODO: hash of ER? - pub fn hash(&self) -> Hash { - self.primary_hash +impl ExecutionReceipt { + /// Returns the hash of this execution receipt. + pub fn hash(&self) -> H256 { + BlakeTwo256::hash_of(self) + } +} + +// TODO: this might be unneccessary, ideally we could interact with the runtime using `ExecutionReceipt` directly. +// Refer to the comment https://github.com/subspace/subspace/pull/219#discussion_r776749767 +#[derive(Decode, Encode, TypeInfo, PartialEq, Eq, Clone, RuntimeDebug)] +pub struct OpaqueExecutionReceipt(Vec); + +impl From> for OpaqueExecutionReceipt { + fn from(inner: ExecutionReceipt) -> Self { + Self(inner.encode()) } } @@ -129,7 +140,7 @@ sp_api::decl_runtime_apis! { /// Submits the execution receipt via an unsigned extrinsic. fn submit_execution_receipt_unsigned( - execution_receipt: ExecutionReceipt<::Hash>, + opaque_execution_receipt: OpaqueExecutionReceipt, ) -> Option<()>; /// Submits the transaction bundle via an unsigned extrinsic. diff --git a/crates/subspace-runtime/src/lib.rs b/crates/subspace-runtime/src/lib.rs index ef851e8386..e0dccb064e 100644 --- a/crates/subspace-runtime/src/lib.rs +++ b/crates/subspace-runtime/src/lib.rs @@ -891,9 +891,15 @@ impl_runtime_apis! { } fn submit_execution_receipt_unsigned( - execution_receipt: sp_executor::ExecutionReceipt<::Hash>, + opaque_execution_receipt: sp_executor::OpaqueExecutionReceipt, ) -> Option<()> { - Executor::submit_execution_receipt_unsigned(execution_receipt).ok() + ::Hash>>::decode( + &mut opaque_execution_receipt.encode().as_slice(), + ) + .ok() + .and_then(|execution_receipt| { + Executor::submit_execution_receipt_unsigned(execution_receipt).ok() + }) } fn submit_transaction_bundle_unsigned(opaque_bundle: OpaqueBundle) -> Option<()> { diff --git a/cumulus/client/cirrus-executor/Cargo.toml b/cumulus/client/cirrus-executor/Cargo.toml index d936cc8ad4..2720df7b9f 100644 --- a/cumulus/client/cirrus-executor/Cargo.toml +++ b/cumulus/client/cirrus-executor/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" # Substrate dependencies sc-client-api = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sc-utils = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sp-runtime = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sp-core = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sp-consensus = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } @@ -30,6 +31,7 @@ tracing = "0.1.25" polkadot-overseer = { path = "../../../polkadot/node/overseer" } polkadot-node-subsystem = { path = "../../../polkadot/node/subsystem" } +cirrus-client-executor-gossip = { path = "../executor-gossip" } cirrus-node-primitives = { path = "../../../crates/cirrus-node-primitives" } cirrus-primitives = { path = "../../primitives" } sp-executor = { path = "../../../crates/sp-executor" } diff --git a/cumulus/client/cirrus-executor/src/lib.rs b/cumulus/client/cirrus-executor/src/lib.rs index 57347d17d4..c67deee35c 100644 --- a/cumulus/client/cirrus-executor/src/lib.rs +++ b/cumulus/client/cirrus-executor/src/lib.rs @@ -21,6 +21,7 @@ mod processor; use sc_client_api::BlockBackend; use sc_transaction_pool_api::InPoolTransaction; +use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_consensus::BlockStatus; @@ -37,6 +38,7 @@ use cumulus_client_consensus_common::ParachainConsensus; use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_overseer::Handle as OverseerHandle; +use cirrus_client_executor_gossip::{Action, GossipMessageHandler}; use cirrus_node_primitives::{ BundleResult, Collation, CollationGenerationConfig, CollationResult, CollatorPair, ExecutorSlotInfo, HeadData, PersistedValidationData, ProcessorResult, @@ -60,8 +62,11 @@ pub struct Executor { parachain_consensus: Box>, runtime_api: Arc, client: Arc, + spawner: Arc, overseer_handle: OverseerHandle, transaction_pool: Arc, + bundle_sender: Arc>>, + execution_receipt_sender: Arc>>, } impl Clone @@ -73,8 +78,11 @@ impl Clone parachain_consensus: self.parachain_consensus.clone(), runtime_api: self.runtime_api.clone(), client: self.client.clone(), + spawner: self.spawner.clone(), overseer_handle: self.overseer_handle.clone(), transaction_pool: self.transaction_pool.clone(), + bundle_sender: self.bundle_sender.clone(), + execution_receipt_sender: self.execution_receipt_sender.clone(), } } } @@ -94,16 +102,22 @@ where runtime_api: Arc, parachain_consensus: Box>, client: Arc, + spawner: Arc, overseer_handle: OverseerHandle, transaction_pool: Arc, + bundle_sender: Arc>>, + execution_receipt_sender: Arc>>, ) -> Self { Self { block_status, runtime_api, parachain_consensus, client, + spawner, overseer_handle, transaction_pool, + bundle_sender, + execution_receipt_sender, } } @@ -169,32 +183,6 @@ where } } - /// Checks the execution receipt from the executor peers. - /// - /// TODO: invoke this once the external ER is received. - #[allow(unused)] - async fn on_execution_receipt_received( - &mut self, - _execution_receipt: ExecutionReceipt<::Hash>, - ) { - // TODO: validate the Proof-of-Election - - // TODO: check if the received ER is same with the one produced locally. - let same_with_produced_locally = true; - - if same_with_produced_locally { - // TODO: rebroadcast ER - } else { - // TODO: generate a fraud proof - let fraud_proof = FraudProof { proof: StorageProof::empty() }; - - // TODO: gossip the fraud proof to farmers - self.overseer_handle - .send_msg(CollationGenerationMessage::FraudProof(fraud_proof), "SubmitFraudProof") - .await; - } - } - async fn produce_candidate( mut self, relay_parent: PHash, @@ -264,11 +252,6 @@ where Some(CollationResult { collation: Collation { head_data, number }, result_sender: None }) } - // TODO: - // - gossip the bundle to the executor peers - // - OnBundleReceivedBySecondaryNode - // - OnBundleEquivocationProof(farmer only) - // - OnInvalidBundleProof(farmer only) async fn produce_bundle(self, slot_info: ExecutorSlotInfo) -> Option { println!("TODO: solve some puzzle based on `slot_info` to be allowed to produce a bundle"); @@ -321,6 +304,10 @@ where extrinsics, }; + if let Err(e) = self.bundle_sender.unbounded_send(bundle.clone()) { + tracing::error!(target: LOG_TARGET, error = ?e, "Failed to send transaction bundle"); + } + Some(BundleResult { opaque_bundle: bundle.into() }) } @@ -334,6 +321,91 @@ where } } +// TODO: proper error type +#[derive(Debug)] +pub struct GossipMessageError; + +impl GossipMessageHandler + for Executor +where + Block: BlockT, + Client: sp_blockchain::HeaderBackend, + BS: BlockBackend + Send + Sync, + RA: ProvideRuntimeApi + Send + Sync, + RA::Api: SecondaryApi, + TransactionPool: sc_transaction_pool_api::TransactionPool, +{ + type Error = GossipMessageError; + + fn on_bundle(&self, bundle: &Bundle) -> Result { + // TODO: check bundle equivocation + + let bundle_exists = false; + + if bundle_exists { + Ok(Action::Empty) + } else { + // TODO: validate the PoE + + for extrinsic in bundle.extrinsics.iter() { + let tx_hash = self.transaction_pool.hash_of(extrinsic); + + if self.transaction_pool.ready_transaction(&tx_hash).is_some() { + // TODO: Set the status of each tx in the bundle to seen + } else { + // TODO: check the legality + // + // if illegal => illegal tx proof + } + } + + // TODO: all checks pass, add to the bundle pool + + Ok(Action::RebroadcastBundle) + } + } + + /// Checks the execution receipt from the executor peers. + fn on_execution_receipt( + &self, + _execution_receipt: &ExecutionReceipt<::Hash>, + ) -> Result { + // TODO: validate the Proof-of-Election + + // TODO: check if the received ER is same with the one produced locally. + let same_with_produced_locally = true; + + if same_with_produced_locally { + Ok(Action::RebroadcastExecutionReceipt) + } else { + // TODO: generate a fraud proof + let fraud_proof = FraudProof { proof: StorageProof::empty() }; + + let mut overseer_handle = self.overseer_handle.clone(); + self.spawner.spawn( + "cirrus-submit-fraud-proof", + None, + async move { + tracing::debug!( + target: LOG_TARGET, + "Submitting fraud proof in a background task..." + ); + overseer_handle + .send_msg( + CollationGenerationMessage::FraudProof(fraud_proof), + "SubmitFraudProof", + ) + .await; + tracing::debug!(target: LOG_TARGET, "Fraud proof submission finished"); + } + .boxed(), + ); + + Ok(Action::Empty) + } + } +} + /// Parameters for [`start_executor`]. pub struct StartExecutorParams { pub client: Arc, @@ -345,6 +417,8 @@ pub struct StartExecutorParams>, pub transaction_pool: Arc, + pub bundle_sender: TracingUnboundedSender>, + pub execution_receipt_sender: TracingUnboundedSender>, } /// Start the executor. @@ -354,13 +428,16 @@ pub async fn start_executor( block_status, announce_block: _, mut overseer_handle, - spawner: _, + spawner, key, parachain_consensus, runtime_api, transaction_pool, + bundle_sender, + execution_receipt_sender, }: StartExecutorParams, -) where +) -> Executor +where Block: BlockT, BS: BlockBackend + Send + Sync + 'static, Spawner: SpawnNamed + Clone + Send + Sync + 'static, @@ -375,41 +452,53 @@ pub async fn start_executor( runtime_api, parachain_consensus, client, + Arc::new(spawner), overseer_handle.clone(), transaction_pool, + Arc::new(bundle_sender), + Arc::new(execution_receipt_sender), ); let span = tracing::Span::current(); - let collator_clone = executor.clone(); - let bundler_clone = executor.clone(); - let collator_span_clone = span.clone(); - let bundler_span_clone = span.clone(); let config = CollationGenerationConfig { key, - collator: Box::new(move |relay_parent, validation_data| { - let collator = collator_clone.clone(); - - collator - .produce_candidate(relay_parent, validation_data.clone()) - .instrument(collator_span_clone.clone()) - .boxed() - }), - bundler: Box::new(move |slot_info| { - let bundler = bundler_clone.clone(); - - bundler.produce_bundle(slot_info).instrument(bundler_span_clone.clone()).boxed() - }), - processor: Box::new(move |primary_hash, bundles, shuffling_seed| { - let processor = executor.clone(); - - processor - .process_bundles(primary_hash, bundles, shuffling_seed) - .instrument(span.clone()) - .boxed() - }), + collator: { + let executor = executor.clone(); + let span = span.clone(); + + Box::new(move |relay_parent, validation_data| { + let executor = executor.clone(); + executor + .produce_candidate(relay_parent, validation_data.clone()) + .instrument(span.clone()) + .boxed() + }) + }, + bundler: { + let executor = executor.clone(); + let span = span.clone(); + + Box::new(move |slot_info| { + let executor = executor.clone(); + executor.produce_bundle(slot_info).instrument(span.clone()).boxed() + }) + }, + processor: { + let executor = executor.clone(); + + Box::new(move |primary_hash, bundles, shuffling_seed| { + let executor = executor.clone(); + executor + .process_bundles(primary_hash, bundles, shuffling_seed) + .instrument(span.clone()) + .boxed() + }) + }, }; overseer_handle .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator") .await; + + executor } diff --git a/cumulus/client/cirrus-executor/src/processor.rs b/cumulus/client/cirrus-executor/src/processor.rs index 2bed6a9819..da327c86db 100644 --- a/cumulus/client/cirrus-executor/src/processor.rs +++ b/cumulus/client/cirrus-executor/src/processor.rs @@ -138,6 +138,12 @@ where // - apply each tx one by one. // - compute the incremental state root and add to the execution trace // - produce ExecutionReceipt + let execution_receipt = ExecutionReceipt { + primary_hash, + secondary_hash: Block::Hash::default(), + state_root: Block::Hash::default(), + state_transition_root: Block::Hash::default(), + }; // The applied txs can be fully removed from the transaction pool @@ -145,17 +151,13 @@ where let is_elected = true; if is_elected { - // TODO: broadcast ER to all executors. + if let Err(e) = self.execution_receipt_sender.unbounded_send(execution_receipt.clone()) + { + tracing::error!(target: LOG_TARGET, error = ?e, "Failed to send execution receipt"); + } // Return `Some(_)` to broadcast ER to all farmers via unsigned extrinsic. - Some(ProcessorResult { - execution_receipt: ExecutionReceipt { - primary_hash, - secondary_hash: Default::default(), - state_root: Default::default(), - state_transition_root: Default::default(), - }, - }) + Some(ProcessorResult { opaque_execution_receipt: execution_receipt.into() }) } else { None } diff --git a/cumulus/client/cirrus-service/Cargo.toml b/cumulus/client/cirrus-service/Cargo.toml index bb9e9272e4..ce2ea24382 100644 --- a/cumulus/client/cirrus-service/Cargo.toml +++ b/cumulus/client/cirrus-service/Cargo.toml @@ -11,11 +11,13 @@ cumulus-client-consensus-common = { path = "../consensus/common" } # Substrate dependencies sc-chain-spec = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sc-client-api = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sc-consensus = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sc-network = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sc-service = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sc-telemetry = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sc-tracing = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } -sc-consensus = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sc-utils = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sp-consensus = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sp-runtime = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } sp-api = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } @@ -29,6 +31,7 @@ parking_lot = "0.11.2" # Cirrus cirrus-client-executor = { path = "../cirrus-executor" } +cirrus-client-executor-gossip = { path = "../executor-gossip" } cirrus-node-primitives = { path = "../../../crates/cirrus-node-primitives" } cirrus-primitives = { path = "../../primitives" } diff --git a/cumulus/client/cirrus-service/src/lib.rs b/cumulus/client/cirrus-service/src/lib.rs index 5713830731..4f5084b13b 100644 --- a/cumulus/client/cirrus-service/src/lib.rs +++ b/cumulus/client/cirrus-service/src/lib.rs @@ -29,8 +29,10 @@ use sc_consensus::{ import_queue::{ImportQueue, IncomingBlock, Link, Origin}, BlockImport, }; +use sc_network::NetworkService; use sc_service::{Configuration, Role, TaskManager}; use sc_transaction_pool_api::TransactionPool; +use sc_utils::mpsc::tracing_unbounded; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_consensus::BlockOrigin; @@ -75,6 +77,7 @@ pub struct StartExecutorParams<'a, Block: BlockT, BS, Client, Spawner, RClient, pub parachain_consensus: Box>, pub import_queue: IQ, pub transaction_pool: Arc, + pub network: Arc>, } /// Start an executor node. @@ -89,6 +92,7 @@ pub async fn start_executor<'a, Block, BS, Client, Backend, Spawner, RClient, IQ parachain_consensus, import_queue: _, transaction_pool, + network, }: StartExecutorParams<'a, Block, BS, Client, Spawner, RClient, IQ, TP>, ) -> sc_service::error::Result<()> where @@ -120,21 +124,42 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); - cirrus_client_executor::start_executor(cirrus_client_executor::StartExecutorParams { - runtime_api: client.clone(), - client, - block_status, - announce_block, - overseer_handle: primary_chain_full_node - .overseer_handle - .clone() - .ok_or_else(|| "Subspace full node did not provide an `OverseerHandle`!")?, - spawner, - key: primary_chain_full_node.collator_key.clone(), - parachain_consensus, - transaction_pool, - }) - .await; + let (bundle_sender, bundle_receiver) = tracing_unbounded("transaction_bundle_stream"); + let (execution_receipt_sender, execution_receipt_receiver) = + tracing_unbounded("execution_receipt_stream"); + + let overseer_handle = primary_chain_full_node + .overseer_handle + .clone() + .ok_or_else(|| "Subspace full node did not provide an `OverseerHandle`!")?; + + let executor = + cirrus_client_executor::start_executor(cirrus_client_executor::StartExecutorParams { + runtime_api: client.clone(), + client, + block_status, + announce_block, + overseer_handle, + spawner, + key: primary_chain_full_node.collator_key.clone(), + parachain_consensus, + transaction_pool, + bundle_sender, + execution_receipt_sender, + }) + .await; + + let executor_gossip = cirrus_client_executor_gossip::start_gossip_worker( + cirrus_client_executor_gossip::ExecutorGossipParams { + network, + executor, + bundle_receiver, + execution_receipt_receiver, + }, + ); + task_manager + .spawn_essential_handle() + .spawn_blocking("cirrus-gossip", None, executor_gossip); task_manager.add_child(primary_chain_full_node.primary_chain_full_node.task_manager); diff --git a/cumulus/client/executor-gossip/Cargo.toml b/cumulus/client/executor-gossip/Cargo.toml new file mode 100644 index 0000000000..805c43e397 --- /dev/null +++ b/cumulus/client/executor-gossip/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "cirrus-client-executor-gossip" +version = "0.1.0" +authors = ["Liu-Cheng Xu "] +edition = "2021" + +[dependencies] +# Substrate dependencies +sc-network = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sc-network-gossip = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sc-utils = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sp-core = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } +sp-runtime = { git = "https://github.com/paritytech/substrate", rev = "5bd5b842d4ea520d281b1398e1f54907c9862fcd" } + +futures = "0.3.17" +parity-scale-codec = { version = "2.3.0", features = ["derive"] } +parking_lot = "0.11.2" +tracing = "0.1" + +sp-executor = { path = "../../../crates/sp-executor" } diff --git a/cumulus/client/executor-gossip/src/lib.rs b/cumulus/client/executor-gossip/src/lib.rs new file mode 100644 index 0000000000..cdb4729db5 --- /dev/null +++ b/cumulus/client/executor-gossip/src/lib.rs @@ -0,0 +1,291 @@ +#![allow(clippy::all)] + +mod worker; + +use self::worker::GossipWorker; +use parity_scale_codec::{Decode, Encode}; +use parking_lot::{Mutex, RwLock}; +use sc_network::{ObservedRole, PeerId}; +use sc_network_gossip::{ + GossipEngine, MessageIntent, Network as GossipNetwork, ValidationResult, Validator, + ValidatorContext, +}; +use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_core::hashing::twox_64; +use sp_executor::{Bundle, ExecutionReceipt}; +use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; +use std::{ + collections::HashSet, + fmt::Debug, + sync::Arc, + time::{Duration, Instant}, +}; + +const LOG_TARGET: &str = "gossip::executor"; + +const EXECUTOR_PROTOCOL_NAME: &str = "/subspace/executor/1"; + +// TODO: proper timeout +/// Timeout for rebroadcasting messages. +/// The default value used in network-gossip is 1100ms. +const REBROADCAST_AFTER: Duration = Duration::from_secs(6); + +type MessageHash = [u8; 8]; + +/// Returns the configuration value to put in [`sc_network::config::NetworkConfiguration::extra_sets`]. +pub fn executor_gossip_peers_set_config() -> sc_network::config::NonDefaultSetConfig { + let mut cfg = + sc_network::config::NonDefaultSetConfig::new(EXECUTOR_PROTOCOL_NAME.into(), 1024 * 1024); + cfg.allow_non_reserved(25, 25); + cfg +} + +/// Gossip engine messages topic. +fn topic() -> Block::Hash { + <::Hashing as HashT>::hash(b"executor") +} + +/// Executor gossip message type. +/// +/// This is the root type that gets encoded and sent on the network. +#[derive(Debug, Encode, Decode)] +pub enum GossipMessage { + Bundle(Bundle), + ExecutionReceipt(ExecutionReceipt), +} + +impl From> for GossipMessage { + fn from(bundle: Bundle) -> Self { + Self::Bundle(bundle) + } +} + +impl From> for GossipMessage { + fn from(execution_receipt: ExecutionReceipt) -> Self { + Self::ExecutionReceipt(execution_receipt) + } +} + +/// What to do with the successfully verified gossip message. +#[derive(Debug)] +pub enum Action { + /// The message does not have to be re-gossiped. + Empty, + /// Gossip the bundle message to other peers. + RebroadcastBundle, + /// Gossip the execution exceipt message to other peers. + RebroadcastExecutionReceipt, +} + +impl Action { + fn rebroadcast_bundle(&self) -> bool { + matches!(self, Self::RebroadcastBundle) + } + + fn rebroadcast_execution_receipt(&self) -> bool { + matches!(self, Self::RebroadcastExecutionReceipt) + } +} + +/// Handler for the messages received from the executor gossip network. +pub trait GossipMessageHandler { + /// Error type. + type Error: Debug; + + /// Validates and applies when a transaction bundle was received. + fn on_bundle(&self, bundle: &Bundle) -> Result; + + /// Validates and applies when an execution receipt was received. + fn on_execution_receipt( + &self, + execution_receipt: &ExecutionReceipt, + ) -> Result; +} + +/// Validator for the gossip messages. +pub struct GossipValidator { + topic: Block::Hash, + executor: Executor, + next_rebroadcast: Mutex, + known_rebroadcasted: RwLock>, +} + +impl> GossipValidator { + pub fn new(executor: Executor) -> Self { + Self { + topic: topic::(), + executor, + next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), + known_rebroadcasted: RwLock::new(HashSet::new()), + } + } + + pub(crate) fn note_rebroadcasted(&self, encoded_message: &[u8]) { + let mut known_rebroadcasted = self.known_rebroadcasted.write(); + known_rebroadcasted.insert(twox_64(encoded_message)); + } + + fn validate_message(&self, msg: GossipMessage) -> ValidationResult { + match msg { + GossipMessage::Bundle(bundle) => { + let outcome = self.executor.on_bundle(&bundle); + match outcome { + Ok(action) if action.rebroadcast_bundle() => + ValidationResult::ProcessAndKeep(self.topic), + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + ?err, + "Invalid GossipMessage::Bundle discarded" + ); + ValidationResult::Discard + }, + _ => ValidationResult::ProcessAndDiscard(self.topic), + } + }, + GossipMessage::ExecutionReceipt(execution_receipt) => { + let outcome = self.executor.on_execution_receipt(&execution_receipt); + match outcome { + Ok(action) if action.rebroadcast_execution_receipt() => + ValidationResult::ProcessAndKeep(self.topic), + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + ?err, + "Invalid GossipMessage::ExecutionReceipt discarded" + ); + ValidationResult::Discard + }, + _ => ValidationResult::ProcessAndDiscard(self.topic), + } + }, + } + } +} + +impl + Send + Sync> Validator + for GossipValidator +{ + fn new_peer( + &self, + _context: &mut dyn ValidatorContext, + _who: &PeerId, + _role: ObservedRole, + ) { + } + + fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, _who: &PeerId) {} + + fn validate( + &self, + _context: &mut dyn ValidatorContext, + _sender: &PeerId, + mut data: &[u8], + ) -> ValidationResult { + match GossipMessage::::decode(&mut data) { + Ok(msg) => { + tracing::debug!(target: LOG_TARGET, ?msg, "Validating incoming message"); + self.validate_message(msg) + }, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + ?err, + ?data, + "Message discarded due to the decoding error" + ); + ValidationResult::Discard + }, + } + } + + /// Produce a closure for validating messages on a given topic. + /// + /// The gossip engine will periodically prune old or no longer relevant messages using + /// `message_expired`. + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_topic, mut data| { + let msg_hash = twox_64(data); + // TODO: can be expired due to the message itself might be too old? + let _msg = match GossipMessage::::decode(&mut data) { + Ok(msg) => msg, + Err(_) => return true, + }; + let expired = { + let known_rebroadcasted = self.known_rebroadcasted.read(); + known_rebroadcasted.contains(&msg_hash) + }; + if expired { + let mut known_rebroadcasted = self.known_rebroadcasted.write(); + known_rebroadcasted.remove(&msg_hash); + } + expired + }) + } + + /// Produce a closure for filtering egress messages. + /// + /// Called before actually sending a message to a peer. + fn message_allowed<'a>( + &'a self, + ) -> Box bool + 'a> { + let do_rebroadcast = { + let now = Instant::now(); + let mut next_rebroadcast = self.next_rebroadcast.lock(); + if now >= *next_rebroadcast { + *next_rebroadcast = now + REBROADCAST_AFTER; + true + } else { + false + } + }; + + Box::new(move |_who, intent, _topic, mut data| { + if let MessageIntent::PeriodicRebroadcast = intent { + return do_rebroadcast + } + + match GossipMessage::::decode(&mut data) { + Ok(_) => true, + Err(_) => false, + } + }) + } +} + +/// Parameters to run the executor gossip service. +pub struct ExecutorGossipParams { + /// Substrate network service. + pub network: Network, + /// Executor instance. + pub executor: Executor, + /// Stream of transaction bundle produced locally. + pub bundle_receiver: TracingUnboundedReceiver>, + /// Stream of execution receipt produced locally. + pub execution_receipt_receiver: TracingUnboundedReceiver>, +} + +/// Starts the executor gossip worker. +pub async fn start_gossip_worker( + gossip_params: ExecutorGossipParams, +) where + Block: BlockT, + Network: GossipNetwork + Send + Sync + Clone + 'static, + Executor: GossipMessageHandler + Send + Sync + 'static, +{ + let ExecutorGossipParams { network, executor, bundle_receiver, execution_receipt_receiver } = + gossip_params; + + let gossip_validator = Arc::new(GossipValidator::new(executor)); + let gossip_engine = + GossipEngine::new(network, EXECUTOR_PROTOCOL_NAME, gossip_validator.clone(), None); + + let gossip_worker = GossipWorker::new( + gossip_validator, + Arc::new(Mutex::new(gossip_engine)), + bundle_receiver, + execution_receipt_receiver, + ); + + gossip_worker.run().await +} diff --git a/cumulus/client/executor-gossip/src/worker.rs b/cumulus/client/executor-gossip/src/worker.rs new file mode 100644 index 0000000000..ed82103120 --- /dev/null +++ b/cumulus/client/executor-gossip/src/worker.rs @@ -0,0 +1,88 @@ +use crate::{topic, GossipMessage, GossipMessageHandler, GossipValidator, LOG_TARGET}; +use futures::{future, FutureExt, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use parking_lot::Mutex; +use sc_network_gossip::GossipEngine; +use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_executor::{Bundle, ExecutionReceipt}; +use sp_runtime::traits::Block as BlockT; +use std::sync::Arc; + +/// A worker plays the executor gossip protocol. +pub struct GossipWorker { + gossip_validator: Arc>, + gossip_engine: Arc>>, + bundle_receiver: TracingUnboundedReceiver>, + execution_receipt_receiver: TracingUnboundedReceiver>, +} + +impl> GossipWorker { + pub(super) fn new( + gossip_validator: Arc>, + gossip_engine: Arc>>, + bundle_receiver: TracingUnboundedReceiver>, + execution_receipt_receiver: TracingUnboundedReceiver>, + ) -> Self { + Self { gossip_validator, gossip_engine, bundle_receiver, execution_receipt_receiver } + } + + fn gossip_bundle(&self, bundle: Bundle) { + let outgoing_message: GossipMessage = bundle.into(); + let encoded_message = outgoing_message.encode(); + self.gossip_validator.note_rebroadcasted(&encoded_message); + self.gossip_engine + .lock() + .gossip_message(topic::(), encoded_message, false); + } + + fn gossip_execution_receipt(&self, execution_receipt: ExecutionReceipt) { + let outgoing_message: GossipMessage = execution_receipt.into(); + let encoded_message = outgoing_message.encode(); + self.gossip_validator.note_rebroadcasted(&encoded_message); + self.gossip_engine + .lock() + .gossip_message(topic::(), encoded_message, false); + } + + pub(super) async fn run(mut self) { + let mut incoming = + Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map( + |notification| async move { + GossipMessage::::decode(&mut ¬ification.message[..]).ok() + }, + )); + + loop { + let engine = self.gossip_engine.clone(); + let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); + + futures::select! { + gossip_message = incoming.next().fuse() => { + if let Some(message) = gossip_message { + tracing::debug!(target: LOG_TARGET, ?message, "Rebroadcasting an executor gossip message"); + match message { + GossipMessage::Bundle(bundle) => self.gossip_bundle(bundle), + GossipMessage::ExecutionReceipt(execution_receipt) => self.gossip_execution_receipt(execution_receipt) + } + } else { + return + } + } + bundle = self.bundle_receiver.next().fuse() => { + if let Some(bundle) = bundle { + self.gossip_bundle(bundle); + } + } + execution_receipt = self.execution_receipt_receiver.next().fuse() => { + if let Some(execution_receipt) = execution_receipt { + self.gossip_execution_receipt(execution_receipt); + } + } + _ = gossip_engine.fuse() => { + tracing::error!(target: LOG_TARGET, "Gossip engine has terminated."); + return; + } + } + } + } +} diff --git a/cumulus/parachain-template/node/Cargo.toml b/cumulus/parachain-template/node/Cargo.toml index 1aeb3cc764..6ec526a139 100644 --- a/cumulus/parachain-template/node/Cargo.toml +++ b/cumulus/parachain-template/node/Cargo.toml @@ -81,6 +81,7 @@ cumulus-client-cli = { path = "../../client/cli" } cumulus-client-consensus-common = { path = "../../client/consensus/common" } cumulus-client-consensus-relay-chain = { path = "../../client/consensus/relay-chain" } cirrus-client-executor = { path = "../../client/cirrus-executor" } +cirrus-client-executor-gossip = { path = "../../client/executor-gossip" } cirrus-client-service = { path = "../../client/cirrus-service" } cirrus-primitives = { path = "../../primitives" } diff --git a/cumulus/parachain-template/node/src/service.rs b/cumulus/parachain-template/node/src/service.rs index bc53d7b929..69e25b6d9a 100644 --- a/cumulus/parachain-template/node/src/service.rs +++ b/cumulus/parachain-template/node/src/service.rs @@ -227,9 +227,15 @@ where return Err("Light client not supported!".into()) } - let parachain_config = prepare_node_config(parachain_config); + let mut parachain_config = prepare_node_config(parachain_config); + + parachain_config + .network + .extra_sets + .push(cirrus_client_executor_gossip::executor_gossip_peers_set_config()); let params = new_partial::(¶chain_config, build_import_queue)?; + let (mut telemetry, _telemetry_worker_handle) = params.other; let relay_chain_full_node = @@ -297,7 +303,7 @@ where &task_manager, &relay_chain_full_node, transaction_pool.clone(), - network, + network.clone(), params.keystore_container.sync_keystore(), force_authoring, )?; @@ -314,6 +320,7 @@ where parachain_consensus, import_queue, transaction_pool, + network, }; cirrus_client_service::start_executor(params).await?; diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index ddfa3485f6..5d72a8ae2b 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -334,16 +334,17 @@ async fn process_primary_block( .await .await??; - let execution_receipt = match (config.processor)(block_hash, bundles, shuffling_seed).await { - Some(processor_result) => processor_result.to_execution_receipt(), - None => { - tracing::debug!( - target: LOG_TARGET, - "Skip sending the execution receipt because executor is not elected", - ); - return Ok(()) - }, - }; + let opaque_execution_receipt = + match (config.processor)(block_hash, bundles, shuffling_seed).await { + Some(processor_result) => processor_result.to_opaque_execution_receipt(), + None => { + tracing::debug!( + target: LOG_TARGET, + "Skip sending the execution receipt because executor is not elected", + ); + return Ok(()) + }, + }; let best_hash = request_best_primary_hash(ctx).await?; @@ -354,7 +355,7 @@ async fn process_primary_block( if let Err(err) = task_sender .send(AllMessages::RuntimeApi(RuntimeApiMessage::Request( best_hash, - RuntimeApiRequest::SubmitExecutionReceipt(execution_receipt), + RuntimeApiRequest::SubmitExecutionReceipt(opaque_execution_receipt), ))) .await { diff --git a/polkadot/node/core/runtime-api/src/cache.rs b/polkadot/node/core/runtime-api/src/cache.rs index acaa20d85b..52cbac9b55 100644 --- a/polkadot/node/core/runtime-api/src/cache.rs +++ b/polkadot/node/core/runtime-api/src/cache.rs @@ -55,7 +55,7 @@ impl Default for RequestResultCache { pub(crate) enum RequestResult { SubmitCandidateReceipt(Hash, u32, Hash), - SubmitExecutionReceipt(Hash, Hash), + SubmitExecutionReceipt(Hash), SubmitTransactionBundle(Hash, Hash), SubmitFraudProof(Hash), ExtractBundles(Hash), diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 20e2e21aec..a630fa8324 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -271,16 +271,16 @@ where RequestResult::SubmitCandidateReceipt(relay_parent, head_number, head_hash) }); }, - Request::SubmitExecutionReceipt(execution_receipt) => { + Request::SubmitExecutionReceipt(opaque_execution_receipt) => { let api = client.runtime_api(); - let execution_receipt_hash = execution_receipt.hash(); let res = api - .submit_execution_receipt_unsigned(&BlockId::Hash(relay_parent), execution_receipt) + .submit_execution_receipt_unsigned( + &BlockId::Hash(relay_parent), + opaque_execution_receipt, + ) .map_err(|e| RuntimeApiError::from(format!("{:?}", e))); metrics.on_request(res.is_ok()); - res.ok().map(|_res| { - RequestResult::SubmitExecutionReceipt(relay_parent, execution_receipt_hash) - }); + res.ok().map(|_res| RequestResult::SubmitExecutionReceipt(relay_parent)); }, Request::SubmitTransactionBundle(opaque_bundle) => { let api = client.runtime_api(); diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 6df51b1531..2a62c9b9a6 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -27,7 +27,7 @@ use futures::channel::oneshot; pub use sc_network::IfDisconnected; use cirrus_node_primitives::{BlockWeight, CollationGenerationConfig}; -use sp_executor::{ExecutionReceipt, FraudProof, OpaqueBundle}; +use sp_executor::{FraudProof, OpaqueBundle, OpaqueExecutionReceipt}; use sp_runtime::OpaqueExtrinsic; use subspace_core_primitives::Randomness; use subspace_runtime_primitives::{opaque::Header as BlockHeader, BlockNumber, Hash}; @@ -109,7 +109,7 @@ pub enum RuntimeApiRequest { // TODO: remove later SubmitCandidateReceipt(u32, Hash), /// Submit the execution receipt to primary chain. - SubmitExecutionReceipt(ExecutionReceipt), + SubmitExecutionReceipt(OpaqueExecutionReceipt), /// Submit the transaction bundle to primary chain. SubmitTransactionBundle(OpaqueBundle), /// Submit the fraud proof to primary chain.