Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More lenient ipv6 auto-update #266

Merged
merged 19 commits into from
Oct 14, 2024
Merged
40 changes: 39 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ impl Service {
kbucket::Entry::Present(_, status)
if status.is_connected() && !status.is_incoming());

if should_count {
if should_count | self.require_more_ip_votes(socket.is_ipv6()) {
// get the advertised local addresses
let (local_ip4_socket, local_ip6_socket) = {
let local_enr = self.local_enr.read();
Expand Down Expand Up @@ -1345,6 +1345,19 @@ impl Service {
}
InsertResult::ValueUpdated | InsertResult::UpdatedPending => {}
InsertResult::Failed(reason) => {
// On large networks with limited IPv6 nodes, it is hard to get enough
// PONG votes in order to estimate our external IP address. Often the
// routing table can be full, and so we reject useful IPv6 here.
//
// If we are low on votes and we initiated this connection (i.e it was not
// forced on us) then lets get a PONG from this node.

if direction == ConnectionDirection::Outgoing
&& self.require_more_ip_votes(enr.udp6_socket().is_some())
{
self.send_ping(enr, None);
}

self.peers_to_ping.remove(&node_id);
trace!(%node_id, ?reason, "Could not insert node");
}
Expand Down Expand Up @@ -1535,6 +1548,31 @@ impl Service {
}
}

/// Helper function that determines if we need more votes for a specific IP
/// class.
///
/// If we are in dual-stack mode and don't have enough votes for either ipv4 or ipv6 and the
/// requesting node/vote is what we need, then this will return true.
fn require_more_ip_votes(&mut self, is_ipv6: bool) -> bool {
if !matches!(self.ip_mode, IpMode::DualStack) {
return false;
}

let Some(ip_votes) = self.ip_votes.as_mut() else {
return false;
};
match (ip_votes.majority(), is_ipv6) {
// We don't have enough ipv4 votes, but this is an IPv4-only node.
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
((None, Some(_)), false) |
// We don't have enough ipv6 votes, but this is an IPv6 node.
((Some(_), None), true) |
// We don't have enough ipv6 or ipv4 nodes, ping this peer.
((None, None), _,) => true,
// We have enough votes do nothing.
((_, _), _,) => false,
}
}

/// A future that maintains the routing table and inserts nodes when required. This returns the
/// [`Event::NodeInserted`] variant if a new node has been inserted into the routing table.
async fn bucket_maintenance_poll(kbuckets: &Arc<RwLock<KBucketsTable<NodeId, Enr>>>) -> Event {
Expand Down
165 changes: 163 additions & 2 deletions src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ use crate::{
};
use enr::CombinedKey;
use parking_lot::RwLock;
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration};
use tokio::sync::{mpsc, oneshot};
use rand;
use std::{
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr},
sync::Arc,
time::Duration,
};
use tokio::sync::{
mpsc,
mpsc::{Sender, UnboundedReceiver},
oneshot,
};

/// Default UDP port number to use for tests requiring UDP exposure
pub const DEFAULT_UDP_PORT: u16 = 0;
Expand Down Expand Up @@ -102,6 +112,65 @@ async fn build_service<P: ProtocolIdentity>(
}
}

fn build_non_handler_service(
local_enr: Arc<RwLock<Enr>>,
enr_key: Arc<RwLock<CombinedKey>>,
filters: bool,
) -> (Service, UnboundedReceiver<HandlerIn>, Sender<HandlerOut>) {
let listen_config = ListenConfig::Ipv4 {
ip: local_enr.read().ip4().unwrap(),
port: local_enr.read().udp4().unwrap(),
};
let config = ConfigBuilder::new(listen_config).build();

// Fake's the handler with empty channels.
let (handler_send, handler_recv_fake) = mpsc::unbounded_channel();
let (handler_send_fake, handler_recv) = mpsc::channel(1000);

let (table_filter, bucket_filter) = if filters {
(
Some(Box::new(kbucket::IpTableFilter) as Box<dyn kbucket::Filter<Enr>>),
Some(Box::new(kbucket::IpBucketFilter) as Box<dyn kbucket::Filter<Enr>>),
)
} else {
(None, None)
};

let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
local_enr.read().node_id().into(),
Duration::from_secs(60),
config.incoming_bucket_limit,
table_filter,
bucket_filter,
)));

let ip_vote = IpVote::new(10, Duration::from_secs(10000));

// create the required channels.
let (_discv5_send, discv5_recv) = mpsc::channel(30);
let (_exit_send, exit) = oneshot::channel();

let service = Service {
local_enr,
enr_key,
kbuckets,
queries: QueryPool::new(config.query_timeout),
active_requests: Default::default(),
active_nodes_responses: HashMap::new(),
ip_votes: Some(ip_vote),
handler_send,
handler_recv,
handler_exit: None,
peers_to_ping: HashSetDelay::new(config.ping_interval),
discv5_recv,
event_stream: None,
exit,
config,
ip_mode: IpMode::DualStack,
};
(service, handler_recv_fake, handler_send_fake)
}

#[tokio::test]
async fn test_updating_connection_on_ping() {
init();
Expand Down Expand Up @@ -341,3 +410,95 @@ async fn test_handling_concurrent_responses() {
assert!(service.active_requests.is_empty());
assert!(service.active_nodes_responses.is_empty());
}

fn generate_rand_ipv4() -> Ipv4Addr {
let a: u8 = rand::random();
let b: u8 = rand::random();
let c: u8 = rand::random();
let d: u8 = rand::random();
Ipv4Addr::new(a, b, c, d)
}

fn generate_rand_ipv6() -> Ipv6Addr {
let a: u16 = rand::random();
let b: u16 = rand::random();
let c: u16 = rand::random();
let d: u16 = rand::random();
let e: u16 = rand::random();
let f: u16 = rand::random();
let g: u16 = rand::random();
let h: u16 = rand::random();
Ipv6Addr::new(a, b, c, d, e, f, g, h)
}

fn random_connection_direction() -> ConnectionDirection {
let outgoing: bool = rand::random();
if outgoing {
ConnectionDirection::Outgoing
} else {
ConnectionDirection::Incoming
}
}

#[tokio::test]
async fn test_ipv6_update_amongst_ipv4_dominated_network() {
init();

let enr_key = CombinedKey::generate_secp256k1();
let ip = std::net::Ipv4Addr::LOCALHOST;
let local_enr = Enr::builder()
.ip4(ip)
.udp4(DEFAULT_UDP_PORT)
.build(&enr_key)
.unwrap();

let (mut service, mut handler_recv, _handler_send) = build_non_handler_service(
Arc::new(RwLock::new(local_enr)),
Arc::new(RwLock::new(enr_key)),
false,
);

// Load up the routing table with 100 random ENRs.

for _ in 0..100 {
let key = CombinedKey::generate_secp256k1();
let ip = generate_rand_ipv4();
let enr = Enr::builder()
.ip4(ip)
.udp4(DEFAULT_UDP_PORT)
.build(&key)
.unwrap();

let direction = random_connection_direction();
service.inject_session_established(enr.clone(), direction);
}

// Attempt to add 10 IPv6 nodes and expect that we attempt to send 10 PING's to IPv6 nodes.
for _ in 0..10 {
let key = CombinedKey::generate_secp256k1();
let ip = generate_rand_ipv6();
let enr = Enr::builder()
.ip6(ip)
.udp6(DEFAULT_UDP_PORT)
.build(&key)
.unwrap();

let direction = ConnectionDirection::Outgoing;
service.inject_session_established(enr.clone(), direction);
}

// Collect all the messages to the handler and count the PING requests for ENR v6 addresses.
let mut v6_pings = 0;
while let Ok(event) = handler_recv.try_recv() {
if let HandlerIn::Request(contact, request) = event {
if contact.node_address().socket_addr.is_ipv6()
&& matches!(request.body, RequestBody::Ping { .. })
{
v6_pings += 1
}
}
}

// Should be 10 ipv6 pings
assert_eq!(v6_pings, 10)
}