Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fefactor node.rs #83

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 27 additions & 47 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,11 @@ enum PeerStatus {
Ready,
ShutingDown,
}
impl<
T: 'static + Default + NodeContext,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
> UtreexoNode<T, Chain>
impl<T, Chain> UtreexoNode<T, Chain>
where
T: 'static + Default + NodeContext,
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
{
pub fn new(
chain: Arc<Chain>,
Expand Down Expand Up @@ -224,7 +223,7 @@ where
initial_height: peer.height,
})
}
async fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> {
fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> {
if let Some(p) = self.peers.remove(&peer) {
p.channel.close();
if !p.feeler && p.state == PeerStatus::Ready {
Expand Down Expand Up @@ -359,30 +358,6 @@ where

Ok((proof, hashes, inputs))
}
pub async fn ibd_handle_headers(&mut self, headers: Vec<BlockHeader>) -> Result<(), WireError> {
if headers.is_empty() {
// Start downloading blocks
self.chain.flush()?;
self.state = NodeState::DownloadBlocks;
return Ok(());
}
self.last_headers_request = Instant::now();
trace!(
"Downloading headers at: {} hash: {}",
self.chain.get_best_block()?.0,
headers[0].block_hash()
);
for header in headers {
self.chain.accept_header(header)?;
}
let locator = self.chain.get_block_locator()?;
let peer = self
.send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE)
.await?;
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));
Ok(())
}
async fn send_to_peer(&self, peer_id: u32, req: NodeRequest) -> Result<(), WireError> {
if let Some(peer) = &self.peers.get(&peer_id) {
if peer.state == PeerStatus::Ready {
Expand Down Expand Up @@ -446,7 +421,7 @@ where
Ok(())
}
#[inline]
pub async fn send_to_random_peer(
async fn send_to_random_peer(
&mut self,
req: NodeRequest,
required_services: ServiceFlags,
Expand Down Expand Up @@ -489,7 +464,7 @@ where
Err(WireError::NoPeerToSendRequest)
}

pub async fn init_peers(&mut self) -> Result<(), WireError> {
async fn init_peers(&mut self) -> Result<(), WireError> {
let anchors = self
.0
.address_man
Expand All @@ -506,19 +481,19 @@ where
Ok(())
}

pub async fn shutdown(&mut self) {
async fn shutdown(&mut self) {
info!("Shutting down node");
for peer in self.peer_ids.iter() {
try_and_log!(self.send_to_peer(*peer, NodeRequest::Shutdown).await);
}
try_and_log!(self.save_peers());
try_and_log!(self.chain.flush());
}
pub async fn ask_block(&mut self) -> Result<(), WireError> {
async fn ask_block(&mut self) -> Result<(), WireError> {
let blocks = self.get_blocks_to_download()?;
self.request_blocks(blocks).await
}
pub async fn handle_broadcast(&self) -> Result<(), WireError> {
async fn handle_broadcast(&self) -> Result<(), WireError> {
for (_, peer) in self.peers.iter() {
if peer.services.has(ServiceFlags::NODE_UTREEXO) {
continue;
Expand All @@ -544,7 +519,7 @@ where
}
Ok(())
}
pub async fn ask_for_addresses(&mut self) -> Result<(), WireError> {
async fn ask_for_addresses(&mut self) -> Result<(), WireError> {
let _ = self
.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE)
.await?;
Expand Down Expand Up @@ -630,6 +605,7 @@ where
self.open_connection(feeler, peer_id, address).await;
Some(())
}
/// Opens a new connection that doesn't require a proxy.
#[allow(clippy::too_many_arguments)]
fn open_non_proxy_connection(
feeler: bool,
Expand All @@ -652,6 +628,7 @@ where
feeler,
)
}
/// Opens a connection through a socks5 interface
#[allow(clippy::too_many_arguments)]
async fn open_proxy_connection(
proxy: SocketAddr,
Expand Down Expand Up @@ -690,6 +667,9 @@ where
);
Ok(())
}
/// Creates a new outgoing connection with `address`. Connection may or may not be feeler,
/// a special connection type that is used to learn about good peers, but are not kept afer
/// handshake.
async fn open_connection(&mut self, feeler: bool, peer_id: usize, address: LocalAddress) {
let (requests_tx, requests_rx) = bounded(1024);
if let Some(ref proxy) = self.socks5 {
Expand Down Expand Up @@ -747,15 +727,14 @@ where
self.peer_id_count += 1;
}
}
impl<Chain: BlockchainInterface + UpdatableChainstate + 'static> UtreexoNode<IBDNode, Chain>

/// An IBD node, should be used to get your chainstate up-to-date with the network, but
/// returns as soon as there's no more blocks to download.
impl<Chain> UtreexoNode<IBDNode, Chain>
where
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
{
/// Processing blocks actually takes a lot of CPU time, and we need to wait until it either
/// succeed or fail before doing something else. This will hang our node up (not peers, only
/// the node) and make it do funky stuff, like timeout blocks we just got but not processed.
/// This task solves it by taking up the actual CPU time for processing blocks, while our
/// node's main loop can continue normally.
async fn handle_block(chain: &Arc<Chain>, block: UtreexoBlock) -> Result<(), WireError> {
let (proof, del_hashes, inputs) = Self::process_proof(
&block.udata.unwrap(),
Expand All @@ -777,7 +756,7 @@ where
}));
Ok(())
}
pub async fn handle_headers(&mut self, headers: Vec<BlockHeader>) -> Result<(), WireError> {
async fn handle_headers(&mut self, headers: Vec<BlockHeader>) -> Result<(), WireError> {
if headers.is_empty() {
// Start downloading blocks
self.chain.flush()?;
Expand Down Expand Up @@ -900,7 +879,7 @@ where
Ok(())
}

pub async fn handle_notification(
async fn handle_notification(
&mut self,
notification: Result<NodeNotification, async_std::channel::RecvError>,
) -> Result<(), WireError> {
Expand Down Expand Up @@ -973,7 +952,7 @@ where
}

PeerMessages::Disconnected(idx) => {
self.handle_disconnection(peer, idx).await?;
self.handle_disconnection(peer, idx)?;

if self.peer_ids.is_empty() || self.utreexo_peers.is_empty() {
self.state = NodeState::WaitingPeer;
Expand All @@ -991,9 +970,10 @@ where
}
}

impl<Chain: BlockchainInterface + UpdatableChainstate + 'static> UtreexoNode<RunningNode, Chain>
impl<Chain> UtreexoNode<RunningNode, Chain>
where
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
{
/// Returns a handle to the node interface that we can use to request data from our
/// node. This struct is thread safe, so we can use it from multiple threads and have
Expand Down Expand Up @@ -1398,7 +1378,7 @@ where
self.last_tip_update = Instant::now();
Ok(())
}
pub async fn handle_notification(
async fn handle_notification(
&mut self,
notification: Result<NodeNotification, async_std::channel::RecvError>,
) -> Result<(), WireError> {
Expand Down Expand Up @@ -1428,7 +1408,7 @@ where
}
}
PeerMessages::Disconnected(idx) => {
self.handle_disconnection(peer, idx).await?;
self.handle_disconnection(peer, idx)?;
}
PeerMessages::Addr(addresses) => {
let addresses: Vec<_> =
Expand Down
Loading