From 02a9c29e2e816c8a583f65b372fcf7b8503e6bad Mon Sep 17 00:00:00 2001 From: Hrvoje Ban Date: Sun, 8 Jul 2018 10:11:38 +0200 Subject: [PATCH] feat(client): implement rfc 6555 (happy eyeballs) Update client connector to attempt a parallel connection using alternative address family, if connection using preferred address family takes too long. Closes: #1316 --- .travis.yml | 9 +- src/client/connect.rs | 302 +++++++++++++++++++++++++++++++++++++++--- src/client/dns.rs | 43 ++++++ 3 files changed, 337 insertions(+), 17 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5d40fcfe7a..405ae6f879 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: rust -sudo: false +sudo: true # Required for functional IPv6 (forces VM instead of Docker). dist: trusty matrix: fast_finish: true @@ -18,6 +18,13 @@ matrix: cache: apt: true +before_script: + # Add an IPv6 config - see the corresponding Travis issue + # https://github.com/travis-ci/travis-ci/issues/8361 + - if [ "${TRAVIS_OS_NAME}" == "linux" ]; then + sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6'; + fi + script: - ./.travis/readme.py - cargo build $FEATURES diff --git a/src/client/connect.rs b/src/client/connect.rs index d347426e3c..ae90f94d5a 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -386,7 +386,7 @@ mod http { use std::mem; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; - use std::time::Duration; + use std::time::{Duration, Instant}; use futures::{Async, Poll}; use futures::future::{Executor, ExecuteError}; @@ -396,6 +396,7 @@ mod http { use net2::TcpBuilder; use tokio_reactor::Handle; use tokio_tcp::{TcpStream, ConnectFuture}; + use tokio_timer::Delay; use super::super::dns; @@ -444,6 +445,7 @@ mod http { keep_alive_timeout: Option, nodelay: bool, local_address: Option, + happy_eyeballs_timeout: Option, } impl HttpConnector { @@ -481,6 +483,7 @@ mod http { keep_alive_timeout: None, nodelay: false, local_address: None, + happy_eyeballs_timeout: Some(Duration::from_millis(300)), } } @@ -519,6 +522,23 @@ mod http { pub fn set_local_address(&mut self, addr: Option) { self.local_address = addr; } + + /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. + /// + /// If hostname resolves to both IPv4 and IPv6 addresses and connection + /// cannot be established using preferred address family before timeout + /// elapses, then connector will in parallel attempt connection using other + /// address family. + /// + /// If `None`, parallel connection attempts are disabled. + /// + /// Default is 300 milliseconds. + /// + /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 + #[inline] + pub fn set_happy_eyeballs_timeout(&mut self, dur: Option) { + self.happy_eyeballs_timeout = dur; + } } impl fmt::Debug for HttpConnector { @@ -564,6 +584,7 @@ mod http { handle: self.handle.clone(), keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, + happy_eyeballs_timeout: self.happy_eyeballs_timeout, } } } @@ -575,6 +596,7 @@ mod http { handle: handle.clone(), keep_alive_timeout: None, nodelay: false, + happy_eyeballs_timeout: None, } } @@ -607,6 +629,7 @@ mod http { handle: Option, keep_alive_timeout: Option, nodelay: bool, + happy_eyeballs_timeout: Option, } enum State { @@ -628,11 +651,8 @@ mod http { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { - state = State::Connecting(ConnectingTcp { - addrs: addrs, - local_addr: local_addr, - current: None - }) + state = State::Connecting(ConnectingTcp::new( + local_addr, addrs, self.happy_eyeballs_timeout)); } else { let host = mem::replace(host, String::new()); let work = dns::Work::new(host, port); @@ -643,11 +663,8 @@ mod http { match try!(future.poll()) { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { - state = State::Connecting(ConnectingTcp { - addrs: addrs, - local_addr: local_addr, - current: None, - }) + state = State::Connecting(ConnectingTcp::new( + local_addr, addrs, self.happy_eyeballs_timeout)); } }; }, @@ -676,14 +693,71 @@ mod http { } struct ConnectingTcp { - addrs: dns::IpAddrs, local_addr: Option, - current: Option, + preferred: ConnectingTcpRemote, + fallback: Option, } impl ConnectingTcp { + fn new( + local_addr: Option, + remote_addrs: dns::IpAddrs, + fallback_timeout: Option, + ) -> ConnectingTcp { + if let Some(fallback_timeout) = fallback_timeout { + let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference(); + if fallback_addrs.is_empty() { + return ConnectingTcp { + local_addr, + preferred: ConnectingTcpRemote::new(preferred_addrs), + fallback: None, + }; + } + + ConnectingTcp { + local_addr, + preferred: ConnectingTcpRemote::new(preferred_addrs), + fallback: Some(ConnectingTcpFallback { + delay: Delay::new(Instant::now() + fallback_timeout), + remote: ConnectingTcpRemote::new(fallback_addrs), + }), + } + } else { + ConnectingTcp { + local_addr, + preferred: ConnectingTcpRemote::new(remote_addrs), + fallback: None, + } + } + } + } + + struct ConnectingTcpFallback { + delay: Delay, + remote: ConnectingTcpRemote, + } + + struct ConnectingTcpRemote { + addrs: dns::IpAddrs, + current: Option, + } + + impl ConnectingTcpRemote { + fn new(addrs: dns::IpAddrs) -> Self { + Self { + addrs, + current: None, + } + } + } + + impl ConnectingTcpRemote { // not a Future, since passing a &Handle to poll - fn poll(&mut self, handle: &Option) -> Poll { + fn poll( + &mut self, + local_addr: &Option, + handle: &Option, + ) -> Poll { let mut err = None; loop { if let Some(ref mut current) = self.current { @@ -694,14 +768,14 @@ mod http { err = Some(e); if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - *current = connect(&addr, &self.local_addr, handle)?; + *current = connect(&addr, local_addr, handle)?; continue; } } } } else if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - self.current = Some(connect(&addr, &self.local_addr, handle)?); + self.current = Some(connect(&addr, local_addr, handle)?); continue; } @@ -710,6 +784,54 @@ mod http { } } + impl ConnectingTcp { + // not a Future, since passing a &Handle to poll + fn poll(&mut self, handle: &Option) -> Poll { + match self.fallback.take() { + None => self.preferred.poll(&self.local_addr, handle), + Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle) { + Ok(Async::Ready(stream)) => { + // Preferred successful - drop fallback. + Ok(Async::Ready(stream)) + } + Ok(Async::NotReady) => match fallback.delay.poll() { + Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle) { + Ok(Async::Ready(stream)) => { + // Fallback successful - drop current preferred, + // but keep fallback as new preferred. + self.preferred = fallback.remote; + Ok(Async::Ready(stream)) + } + Ok(Async::NotReady) => { + // Neither preferred nor fallback are ready. + self.fallback = Some(fallback); + Ok(Async::NotReady) + } + Err(_) => { + // Fallback failed - resume with preferred only. + Ok(Async::NotReady) + } + }, + Ok(Async::NotReady) => { + // Too early to attempt fallback. + self.fallback = Some(fallback); + Ok(Async::NotReady) + } + Err(_) => { + // Fallback delay failed - resume with preferred only. + Ok(Async::NotReady) + } + } + Err(_) => { + // Preferred failed - use fallback as new preferred. + self.preferred = fallback.remote; + self.preferred.poll(&self.local_addr, handle) + } + } + } + } + } + // Make this Future unnameable outside of this crate. mod http_connector { use super::*; @@ -783,6 +905,154 @@ mod http { assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); } + + #[test] + fn client_happy_eyeballs() { + extern crate pretty_env_logger; + + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; + use std::time::{Duration, Instant}; + + use futures::{Async, Poll}; + use tokio::runtime::current_thread::Runtime; + use tokio_reactor::Handle; + + use super::dns; + use super::ConnectingTcp; + + let _ = pretty_env_logger::try_init(); + let server4 = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server4.local_addr().unwrap(); + let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap(); + let mut rt = Runtime::new().unwrap(); + + let local_timeout = Duration::default(); + let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1; + let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1; + let fallback_timeout = ::std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout) + + Duration::from_millis(250); + + let scenarios = &[ + // Fast primary, without fallback. + (&[local_ipv4_addr()][..], + 4, local_timeout, false), + (&[local_ipv6_addr()][..], + 6, local_timeout, false), + + // Fast primary, with (unused) fallback. + (&[local_ipv4_addr(), local_ipv6_addr()][..], + 4, local_timeout, false), + (&[local_ipv6_addr(), local_ipv4_addr()][..], + 6, local_timeout, false), + + // Unreachable + fast primary, without fallback. + (&[unreachable_ipv4_addr(), local_ipv4_addr()][..], + 4, unreachable_v4_timeout, false), + (&[unreachable_ipv6_addr(), local_ipv6_addr()][..], + 6, unreachable_v6_timeout, false), + + // Unreachable + fast primary, with (unused) fallback. + (&[unreachable_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..], + 4, unreachable_v4_timeout, false), + (&[unreachable_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..], + 6, unreachable_v6_timeout, true), + + // Slow primary, with (used) fallback. + (&[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..], + 6, fallback_timeout, false), + (&[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..], + 4, fallback_timeout, true), + + // Slow primary, with (used) unreachable + fast fallback. + (&[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..], + 6, fallback_timeout + unreachable_v6_timeout, false), + (&[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..], + 4, fallback_timeout + unreachable_v4_timeout, true), + ]; + + // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network. + // Otherwise, connection to "slow" IPv6 address will error-out immediatelly. + let ipv6_accessible = measure_connect(slow_ipv6_addr()).0; + + for &(hosts, family, timeout, needs_ipv6_access) in scenarios { + if needs_ipv6_access && !ipv6_accessible { + continue; + } + + let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect(); + let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout)); + let fut = ConnectingTcpFuture(connecting_tcp); + + let start = Instant::now(); + let res = rt.block_on(fut).unwrap(); + let duration = start.elapsed(); + + // Allow actual duration to be +/- 150ms off. + let min_duration = if timeout >= Duration::from_millis(150) { + timeout - Duration::from_millis(150) + } else { + Duration::default() + }; + let max_duration = timeout + Duration::from_millis(150); + + assert_eq!(res, family); + assert!(duration >= min_duration); + assert!(duration <= max_duration); + } + + struct ConnectingTcpFuture(ConnectingTcp); + + impl Future for ConnectingTcpFuture { + type Item = u8; + type Error = ::std::io::Error; + + fn poll(&mut self) -> Poll { + match self.0.poll(&Some(Handle::default())) { + Ok(Async::Ready(stream)) => Ok(Async::Ready( + if stream.peer_addr().unwrap().is_ipv4() { 4 } else { 6 } + )), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err), + } + } + } + + fn local_ipv4_addr() -> IpAddr { + Ipv4Addr::new(127, 0, 0, 1).into() + } + + fn local_ipv6_addr() -> IpAddr { + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into() + } + + fn unreachable_ipv4_addr() -> IpAddr { + Ipv4Addr::new(127, 0, 0, 2).into() + } + + fn unreachable_ipv6_addr() -> IpAddr { + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into() + } + + fn slow_ipv4_addr() -> IpAddr { + // RFC 6890 reserved IPv4 address. + Ipv4Addr::new(198, 18, 0, 25).into() + } + + fn slow_ipv6_addr() -> IpAddr { + // RFC 6890 reserved IPv6 address. + Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into() + } + + fn measure_connect(addr: IpAddr) -> (bool, Duration) { + let start = Instant::now(); + let result = ::std::net::TcpStream::connect_timeout( + &(addr, 80).into(), Duration::from_secs(1)); + + let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut; + let duration = start.elapsed(); + (reachable, duration) + } + } } } diff --git a/src/client/dns.rs b/src/client/dns.rs index 182481d343..866b0e5e9c 100644 --- a/src/client/dns.rs +++ b/src/client/dns.rs @@ -35,6 +35,10 @@ pub struct IpAddrs { } impl IpAddrs { + pub fn new(addrs: Vec) -> Self { + IpAddrs { iter: addrs.into_iter() } + } + pub fn try_parse(host: &str, port: u16) -> Option { if let Ok(addr) = host.parse::() { let addr = SocketAddrV4::new(addr, port); @@ -46,6 +50,23 @@ impl IpAddrs { } None } + + pub fn split_by_preference(self) -> (IpAddrs, IpAddrs) { + let preferring_v6 = self.iter + .as_slice() + .first() + .map(SocketAddr::is_ipv6) + .unwrap_or(false); + + let (preferred, fallback) = self.iter + .partition::, _>(|addr| addr.is_ipv6() == preferring_v6); + + (IpAddrs::new(preferred), IpAddrs::new(fallback)) + } + + pub fn is_empty(&self) -> bool { + self.iter.as_slice().is_empty() + } } impl Iterator for IpAddrs { @@ -55,3 +76,25 @@ impl Iterator for IpAddrs { self.iter.next() } } + +#[cfg(test)] +mod tests { + use std::net::{Ipv4Addr, Ipv6Addr}; + use super::*; + + #[test] + fn test_ip_addrs_split_by_preference() { + let v4_addr = (Ipv4Addr::new(127, 0, 0, 1), 80).into(); + let v6_addr = (Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 80).into(); + + let (mut preferred, mut fallback) = + IpAddrs { iter: vec![v4_addr, v6_addr].into_iter() }.split_by_preference(); + assert!(preferred.next().unwrap().is_ipv4()); + assert!(fallback.next().unwrap().is_ipv6()); + + let (mut preferred, mut fallback) = + IpAddrs { iter: vec![v6_addr, v4_addr].into_iter() }.split_by_preference(); + assert!(preferred.next().unwrap().is_ipv6()); + assert!(fallback.next().unwrap().is_ipv4()); + } +}