diff --git a/src/service.rs b/src/service.rs index c28eaa20b..8b4093985 100644 --- a/src/service.rs +++ b/src/service.rs @@ -817,7 +817,7 @@ impl Service { kbucket::Entry::Present(_, status) if status.is_connected() && !status.is_incoming()); - if should_count { + if should_count | self.require_more_ip_votes(socket.is_ipv6()) { // get the advertised local addresses let (local_ip4_socket, local_ip6_socket) = { let local_enr = self.local_enr.read(); @@ -1345,6 +1345,19 @@ impl Service { } InsertResult::ValueUpdated | InsertResult::UpdatedPending => {} InsertResult::Failed(reason) => { + // On large networks with limited IPv6 nodes, it is hard to get enough + // PONG votes in order to estimate our external IP address. Often the + // routing table can be full, and so we reject useful IPv6 here. + // + // If we are low on votes and we initiated this connection (i.e it was not + // forced on us) then lets get a PONG from this node. + + if direction == ConnectionDirection::Outgoing + && self.require_more_ip_votes(enr.udp6_socket().is_some()) + { + self.send_ping(enr, None); + } + self.peers_to_ping.remove(&node_id); trace!(%node_id, ?reason, "Could not insert node"); } @@ -1535,6 +1548,31 @@ impl Service { } } + /// Helper function that determines if we need more votes for a specific IP + /// class. + /// + /// If we are in dual-stack mode and don't have enough votes for either ipv4 or ipv6 and the + /// requesting node/vote is what we need, then this will return true. + fn require_more_ip_votes(&mut self, is_ipv6: bool) -> bool { + if !matches!(self.ip_mode, IpMode::DualStack) { + return false; + } + + let Some(ip_votes) = self.ip_votes.as_mut() else { + return false; + }; + match (ip_votes.majority(), is_ipv6) { + // We don't have enough ipv4 votes, but this is an IPv4-only node. + ((None, Some(_)), false) | + // We don't have enough ipv6 votes, but this is an IPv6 node. + ((Some(_), None), true) | + // We don't have enough ipv6 or ipv4 nodes, ping this peer. + ((None, None), _,) => true, + // We have enough votes do nothing. + ((_, _), _,) => false, + } + } + /// A future that maintains the routing table and inserts nodes when required. This returns the /// [`Event::NodeInserted`] variant if a new node has been inserted into the routing table. async fn bucket_maintenance_poll(kbuckets: &Arc>>) -> Event { diff --git a/src/service/test.rs b/src/service/test.rs index c85f6a97a..128ad9e07 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -17,8 +17,18 @@ use crate::{ }; use enr::CombinedKey; use parking_lot::RwLock; -use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration}; -use tokio::sync::{mpsc, oneshot}; +use rand; +use std::{ + collections::HashMap, + net::{Ipv4Addr, Ipv6Addr}, + sync::Arc, + time::Duration, +}; +use tokio::sync::{ + mpsc, + mpsc::{Sender, UnboundedReceiver}, + oneshot, +}; /// Default UDP port number to use for tests requiring UDP exposure pub const DEFAULT_UDP_PORT: u16 = 0; @@ -102,6 +112,65 @@ async fn build_service( } } +fn build_non_handler_service( + local_enr: Arc>, + enr_key: Arc>, + filters: bool, +) -> (Service, UnboundedReceiver, Sender) { + let listen_config = ListenConfig::Ipv4 { + ip: local_enr.read().ip4().unwrap(), + port: local_enr.read().udp4().unwrap(), + }; + let config = ConfigBuilder::new(listen_config).build(); + + // Fake's the handler with empty channels. + let (handler_send, handler_recv_fake) = mpsc::unbounded_channel(); + let (handler_send_fake, handler_recv) = mpsc::channel(1000); + + let (table_filter, bucket_filter) = if filters { + ( + Some(Box::new(kbucket::IpTableFilter) as Box>), + Some(Box::new(kbucket::IpBucketFilter) as Box>), + ) + } else { + (None, None) + }; + + let kbuckets = Arc::new(RwLock::new(KBucketsTable::new( + local_enr.read().node_id().into(), + Duration::from_secs(60), + config.incoming_bucket_limit, + table_filter, + bucket_filter, + ))); + + let ip_vote = IpVote::new(10, Duration::from_secs(10000)); + + // create the required channels. + let (_discv5_send, discv5_recv) = mpsc::channel(30); + let (_exit_send, exit) = oneshot::channel(); + + let service = Service { + local_enr, + enr_key, + kbuckets, + queries: QueryPool::new(config.query_timeout), + active_requests: Default::default(), + active_nodes_responses: HashMap::new(), + ip_votes: Some(ip_vote), + handler_send, + handler_recv, + handler_exit: None, + peers_to_ping: HashSetDelay::new(config.ping_interval), + discv5_recv, + event_stream: None, + exit, + config, + ip_mode: IpMode::DualStack, + }; + (service, handler_recv_fake, handler_send_fake) +} + #[tokio::test] async fn test_updating_connection_on_ping() { init(); @@ -341,3 +410,95 @@ async fn test_handling_concurrent_responses() { assert!(service.active_requests.is_empty()); assert!(service.active_nodes_responses.is_empty()); } + +fn generate_rand_ipv4() -> Ipv4Addr { + let a: u8 = rand::random(); + let b: u8 = rand::random(); + let c: u8 = rand::random(); + let d: u8 = rand::random(); + Ipv4Addr::new(a, b, c, d) +} + +fn generate_rand_ipv6() -> Ipv6Addr { + let a: u16 = rand::random(); + let b: u16 = rand::random(); + let c: u16 = rand::random(); + let d: u16 = rand::random(); + let e: u16 = rand::random(); + let f: u16 = rand::random(); + let g: u16 = rand::random(); + let h: u16 = rand::random(); + Ipv6Addr::new(a, b, c, d, e, f, g, h) +} + +fn random_connection_direction() -> ConnectionDirection { + let outgoing: bool = rand::random(); + if outgoing { + ConnectionDirection::Outgoing + } else { + ConnectionDirection::Incoming + } +} + +#[tokio::test] +async fn test_ipv6_update_amongst_ipv4_dominated_network() { + init(); + + let enr_key = CombinedKey::generate_secp256k1(); + let ip = std::net::Ipv4Addr::LOCALHOST; + let local_enr = Enr::builder() + .ip4(ip) + .udp4(DEFAULT_UDP_PORT) + .build(&enr_key) + .unwrap(); + + let (mut service, mut handler_recv, _handler_send) = build_non_handler_service( + Arc::new(RwLock::new(local_enr)), + Arc::new(RwLock::new(enr_key)), + false, + ); + + // Load up the routing table with 100 random ENRs. + + for _ in 0..100 { + let key = CombinedKey::generate_secp256k1(); + let ip = generate_rand_ipv4(); + let enr = Enr::builder() + .ip4(ip) + .udp4(DEFAULT_UDP_PORT) + .build(&key) + .unwrap(); + + let direction = random_connection_direction(); + service.inject_session_established(enr.clone(), direction); + } + + // Attempt to add 10 IPv6 nodes and expect that we attempt to send 10 PING's to IPv6 nodes. + for _ in 0..10 { + let key = CombinedKey::generate_secp256k1(); + let ip = generate_rand_ipv6(); + let enr = Enr::builder() + .ip6(ip) + .udp6(DEFAULT_UDP_PORT) + .build(&key) + .unwrap(); + + let direction = ConnectionDirection::Outgoing; + service.inject_session_established(enr.clone(), direction); + } + + // Collect all the messages to the handler and count the PING requests for ENR v6 addresses. + let mut v6_pings = 0; + while let Ok(event) = handler_recv.try_recv() { + if let HandlerIn::Request(contact, request) = event { + if contact.node_address().socket_addr.is_ipv6() + && matches!(request.body, RequestBody::Ping { .. }) + { + v6_pings += 1 + } + } + } + + // Should be 10 ipv6 pings + assert_eq!(v6_pings, 10) +}