diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 08789801..e9af1204 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -171,12 +171,11 @@ enum PeerStatus { Ready, ShutingDown, } -impl< - T: 'static + Default + NodeContext, - Chain: BlockchainInterface + UpdatableChainstate + 'static, - > UtreexoNode +impl UtreexoNode where + T: 'static + Default + NodeContext, WireError: From<::Error>, + Chain: BlockchainInterface + UpdatableChainstate + 'static, { pub fn new( chain: Arc, @@ -224,7 +223,7 @@ where initial_height: peer.height, }) } - async fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> { + fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> { if let Some(p) = self.peers.remove(&peer) { p.channel.close(); if !p.feeler && p.state == PeerStatus::Ready { @@ -359,30 +358,6 @@ where Ok((proof, hashes, inputs)) } - pub async fn ibd_handle_headers(&mut self, headers: Vec) -> Result<(), WireError> { - if headers.is_empty() { - // Start downloading blocks - self.chain.flush()?; - self.state = NodeState::DownloadBlocks; - return Ok(()); - } - self.last_headers_request = Instant::now(); - trace!( - "Downloading headers at: {} hash: {}", - self.chain.get_best_block()?.0, - headers[0].block_hash() - ); - for header in headers { - self.chain.accept_header(header)?; - } - let locator = self.chain.get_block_locator()?; - let peer = self - .send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE) - .await?; - self.inflight - .insert(InflightRequests::Headers, (peer, Instant::now())); - Ok(()) - } async fn send_to_peer(&self, peer_id: u32, req: NodeRequest) -> Result<(), WireError> { if let Some(peer) = &self.peers.get(&peer_id) { if peer.state == PeerStatus::Ready { @@ -446,7 +421,7 @@ where Ok(()) } #[inline] - pub async fn send_to_random_peer( + async fn send_to_random_peer( &mut self, req: NodeRequest, required_services: ServiceFlags, @@ -489,7 +464,7 @@ where Err(WireError::NoPeerToSendRequest) } - pub async fn init_peers(&mut self) -> Result<(), WireError> { + async fn init_peers(&mut self) -> Result<(), WireError> { let anchors = self .0 .address_man @@ -506,7 +481,7 @@ where Ok(()) } - pub async fn shutdown(&mut self) { + async fn shutdown(&mut self) { info!("Shutting down node"); for peer in self.peer_ids.iter() { try_and_log!(self.send_to_peer(*peer, NodeRequest::Shutdown).await); @@ -514,11 +489,11 @@ where try_and_log!(self.save_peers()); try_and_log!(self.chain.flush()); } - pub async fn ask_block(&mut self) -> Result<(), WireError> { + async fn ask_block(&mut self) -> Result<(), WireError> { let blocks = self.get_blocks_to_download()?; self.request_blocks(blocks).await } - pub async fn handle_broadcast(&self) -> Result<(), WireError> { + async fn handle_broadcast(&self) -> Result<(), WireError> { for (_, peer) in self.peers.iter() { if peer.services.has(ServiceFlags::NODE_UTREEXO) { continue; @@ -544,7 +519,7 @@ where } Ok(()) } - pub async fn ask_for_addresses(&mut self) -> Result<(), WireError> { + async fn ask_for_addresses(&mut self) -> Result<(), WireError> { let _ = self .send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE) .await?; @@ -630,6 +605,7 @@ where self.open_connection(feeler, peer_id, address).await; Some(()) } + /// Opens a new connection that doesn't require a proxy. #[allow(clippy::too_many_arguments)] fn open_non_proxy_connection( feeler: bool, @@ -652,6 +628,7 @@ where feeler, ) } + /// Opens a connection through a socks5 interface #[allow(clippy::too_many_arguments)] async fn open_proxy_connection( proxy: SocketAddr, @@ -690,6 +667,9 @@ where ); Ok(()) } + /// Creates a new outgoing connection with `address`. Connection may or may not be feeler, + /// a special connection type that is used to learn about good peers, but are not kept afer + /// handshake. async fn open_connection(&mut self, feeler: bool, peer_id: usize, address: LocalAddress) { let (requests_tx, requests_rx) = bounded(1024); if let Some(ref proxy) = self.socks5 { @@ -747,15 +727,14 @@ where self.peer_id_count += 1; } } -impl UtreexoNode + +/// An IBD node, should be used to get your chainstate up-to-date with the network, but +/// returns as soon as there's no more blocks to download. +impl UtreexoNode where WireError: From<::Error>, + Chain: BlockchainInterface + UpdatableChainstate + 'static, { - /// Processing blocks actually takes a lot of CPU time, and we need to wait until it either - /// succeed or fail before doing something else. This will hang our node up (not peers, only - /// the node) and make it do funky stuff, like timeout blocks we just got but not processed. - /// This task solves it by taking up the actual CPU time for processing blocks, while our - /// node's main loop can continue normally. async fn handle_block(chain: &Arc, block: UtreexoBlock) -> Result<(), WireError> { let (proof, del_hashes, inputs) = Self::process_proof( &block.udata.unwrap(), @@ -777,7 +756,7 @@ where })); Ok(()) } - pub async fn handle_headers(&mut self, headers: Vec) -> Result<(), WireError> { + async fn handle_headers(&mut self, headers: Vec) -> Result<(), WireError> { if headers.is_empty() { // Start downloading blocks self.chain.flush()?; @@ -900,7 +879,7 @@ where Ok(()) } - pub async fn handle_notification( + async fn handle_notification( &mut self, notification: Result, ) -> Result<(), WireError> { @@ -973,7 +952,7 @@ where } PeerMessages::Disconnected(idx) => { - self.handle_disconnection(peer, idx).await?; + self.handle_disconnection(peer, idx)?; if self.peer_ids.is_empty() || self.utreexo_peers.is_empty() { self.state = NodeState::WaitingPeer; @@ -991,9 +970,10 @@ where } } -impl UtreexoNode +impl UtreexoNode where WireError: From<::Error>, + Chain: BlockchainInterface + UpdatableChainstate + 'static, { /// Returns a handle to the node interface that we can use to request data from our /// node. This struct is thread safe, so we can use it from multiple threads and have @@ -1398,7 +1378,7 @@ where self.last_tip_update = Instant::now(); Ok(()) } - pub async fn handle_notification( + async fn handle_notification( &mut self, notification: Result, ) -> Result<(), WireError> { @@ -1428,7 +1408,7 @@ where } } PeerMessages::Disconnected(idx) => { - self.handle_disconnection(peer, idx).await?; + self.handle_disconnection(peer, idx)?; } PeerMessages::Addr(addresses) => { let addresses: Vec<_> =