Skip to content

Commit

Permalink
feat(client): add ALPN h2 support for client connectors
Browse files Browse the repository at this point in the history
- Adds `Connected::negotiated_h2()` method to signal the connection must
  use HTTP2. `Connect` implementations should set this if using ALPN.

If a connection to a host is detected to have been upgraded via ALPN,
any other oustanding connect futures will be canceled, and the waiting
requests will make use of the single HTTP2 connection.

The `http2_only` builder configuration still works the same, not
requiring ALPN at all, and always using only a single connection.
  • Loading branch information
seanmonstar committed Oct 31, 2018
1 parent bf188b2 commit 976a77a
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 95 deletions.
1 change: 1 addition & 0 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ impl Builder {
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
trace!("client handshake HTTP/{}", if self.http2 { 2 } else { 1 });
Handshake {
builder: self.clone(),
io: Some(io),
Expand Down
19 changes: 7 additions & 12 deletions src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub trait Connect: Send + Sync {
/// A set of properties to describe where and how to try to connect.
#[derive(Clone, Debug)]
pub struct Destination {
//pub(super) alpn: Alpn,
pub(super) uri: Uri,
}

Expand All @@ -46,21 +45,18 @@ pub struct Destination {
/// was used, or if connected to an HTTP proxy.
#[derive(Debug)]
pub struct Connected {
//alpn: Alpn,
pub(super) alpn: Alpn,
pub(super) is_proxied: bool,
pub(super) extra: Option<Extra>,
}

pub(super) struct Extra(Box<ExtraInner>);

/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
#[derive(Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub(super) enum Alpn {
Http1,
//H2,
//Http1OrH2
H2,
None,
}
*/

impl Destination {
/// Get the protocol scheme.
Expand Down Expand Up @@ -246,7 +242,7 @@ impl Connected {
/// Create new `Connected` type with empty metadata.
pub fn new() -> Connected {
Connected {
//alpn: Alpn::Http1,
alpn: Alpn::None,
is_proxied: false,
extra: None,
}
Expand Down Expand Up @@ -274,19 +270,18 @@ impl Connected {
self
}

/*
/// Set that the connected transport negotiated HTTP/2 as it's
/// next protocol.
pub fn h2(mut self) -> Connected {
pub fn negotiated_h2(mut self) -> Connected {
self.alpn = Alpn::H2;
self
}
*/

// Don't public expose that `Connected` is `Clone`, unsure if we want to
// keep that contract...
pub(super) fn clone(&self) -> Connected {
Connected {
alpn: self.alpn.clone(),
is_proxied: self.is_proxied,
extra: self.extra.clone(),
}
Expand Down
132 changes: 83 additions & 49 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ use futures::{Async, Future, Poll};
use futures::future::{self, Either, Executor};
use futures::sync::oneshot;
use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use http::header::{HeaderValue, HOST};
use http::uri::Scheme;

use body::{Body, Payload};
use common::{Exec, lazy as hyper_lazy, Lazy};
use self::connect::{Connect, Connected, Destination};
use self::connect::{Alpn, Connect, Connected, Destination};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};

#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
Expand Down Expand Up @@ -192,23 +192,19 @@ where C: Connect + Sync + 'static,

/// Send a constructed Request using this Client.
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
let is_http_11 = self.ver == Ver::Http1 && match req.version() {
Version::HTTP_11 => true,
Version::HTTP_10 => false,
other => {
let is_http_connect = req.method() == &Method::CONNECT;
match req.version() {
Version::HTTP_11 => (),
Version::HTTP_10 => if is_http_connect {
debug!("CONNECT is not allowed for HTTP/1.0");
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
},
other => if self.ver != Ver::Http2 {
error!("Request has unsupported version \"{:?}\"", other);
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
}
};

let is_http_connect = req.method() == &Method::CONNECT;

if !is_http_11 && is_http_connect {
debug!("client does not support CONNECT requests for {:?}", req.version());
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
}


let uri = req.uri().clone();
let domain = match (uri.scheme_part(), uri.authority_part()) {
(Some(scheme), Some(auth)) => {
Expand All @@ -233,21 +229,7 @@ where C: Connect + Sync + 'static,
}
};

if self.set_host && self.ver == Ver::Http1 {
if let Entry::Vacant(entry) = req.headers_mut().entry(HOST).expect("HOST is always valid header name") {
let hostname = uri.host().expect("authority implies host");
let host = if let Some(port) = uri.port() {
let s = format!("{}:{}", hostname, port);
HeaderValue::from_str(&s)
} else {
HeaderValue::from_str(hostname)
}.expect("uri host is valid header value");
entry.insert(host);
}
}


let pool_key = (Arc::new(domain.to_string()), self.ver);
let pool_key = Arc::new(domain.to_string());
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key)))
}

Expand Down Expand Up @@ -283,23 +265,38 @@ where C: Connect + Sync + 'static,
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=ClientError<B>> {
let conn = self.connection_for(req.uri().clone(), pool_key);

let ver = self.ver;
let set_host = self.set_host;
let executor = self.executor.clone();
conn.and_then(move |mut pooled| {
if ver == Ver::Http1 {
// CONNECT always sends origin-form, so check it first...
if pooled.is_http1() {
if set_host {
let uri = req.uri().clone();
req
.headers_mut()
.entry(HOST)
.expect("HOST is always valid header name")
.or_insert_with(|| {
let hostname = uri.host().expect("authority implies host");
if let Some(port) = uri.port() {
let s = format!("{}:{}", hostname, port);
HeaderValue::from_str(&s)
} else {
HeaderValue::from_str(hostname)
}.expect("uri host is valid header value")
});
}

// CONNECT always sends authority-form, so check it first...
if req.method() == &Method::CONNECT {
authority_form(req.uri_mut());
} else if pooled.conn_info.is_proxied {
absolute_form(req.uri_mut());
} else {
origin_form(req.uri_mut());
};
} else {
debug_assert!(
req.method() != &Method::CONNECT,
"Client should have returned Error for HTTP2 CONNECT"
);
} else if req.method() == &Method::CONNECT {
debug!("client does not support CONNECT requests over HTTP2");
return Either::A(future::err(ClientError::Normal(::Error::new_user_unsupported_request_method())));
}

let fut = pooled.send_request_retryable(req)
Expand All @@ -322,10 +319,10 @@ where C: Connect + Sync + 'static,
// To counteract this, we must check if our senders 'want' channel
// has been closed after having tried to send. If so, error out...
if pooled.is_closed() {
return Either::A(fut);
return Either::B(Either::A(fut));
}

Either::B(fut
Either::B(Either::B(fut
.and_then(move |mut res| {
// If pooled is HTTP/2, we can toss this reference immediately.
//
Expand All @@ -337,7 +334,7 @@ where C: Connect + Sync + 'static,
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() {
if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
drop(pooled);
} else if !res.body().is_end_stream() {
let (delayed_tx, delayed_rx) = oneshot::channel();
Expand Down Expand Up @@ -370,7 +367,7 @@ where C: Connect + Sync + 'static,
}
}
Ok(res)
}))
})))
})
}

Expand Down Expand Up @@ -463,8 +460,9 @@ where C: Connect + Sync + 'static,
let pool = self.pool.clone();
let h1_writev = self.h1_writev;
let h1_title_case_headers = self.h1_title_case_headers;
let ver = self.ver;
let is_ver_h2 = self.ver == Ver::Http2;
let connector = self.connector.clone();
let ver = pool_key.1;
let dst = Destination {
uri,
};
Expand All @@ -474,7 +472,7 @@ where C: Connect + Sync + 'static,
// If the pool_key is for HTTP/2, and there is already a
// connection being estabalished, then this can't take a
// second lock. The "connect_to" future is Canceled.
let connecting = match pool.connecting(&pool_key) {
let connecting = match pool.connecting(&pool_key, ver) {
Some(lock) => lock,
None => {
let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress"));
Expand All @@ -484,11 +482,31 @@ where C: Connect + Sync + 'static,
Either::A(connector.connect(dst)
.map_err(::Error::new_connect)
.and_then(move |(io, connected)| {
conn::Builder::new()
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
// a single HTTP2 one.
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
match connecting.alpn_h2(&pool) {
Some(lock) => {
trace!("ALPN negotiated h2, updating pool");
lock
},
None => {
// Another connection has already upgraded,
// the pool checkout should finish up for us.
let canceled = ::Error::new_canceled(Some("ALPN upgraded to HTTP/2"));
return Either::B(future::err(canceled));
}
}
} else {
connecting
};
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
Either::A(conn::Builder::new()
.exec(executor.clone())
.h1_writev(h1_writev)
.h1_title_case_headers(h1_title_case_headers)
.http2_only(pool_key.1 == Ver::Http2)
.http2_only(is_h2)
.handshake(io)
.and_then(move |(tx, conn)| {
let bg = executor.execute(conn.map_err(|e| {
Expand All @@ -509,12 +527,13 @@ where C: Connect + Sync + 'static,
.map(move |tx| {
pool.pooled(connecting, PoolClient {
conn_info: connected,
tx: match ver {
Ver::Http1 => PoolTx::Http1(tx),
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
tx: if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
},
})
})
}))
}))
})
}
Expand Down Expand Up @@ -591,6 +610,17 @@ impl<B> PoolClient<B> {
}
}

fn is_http1(&self) -> bool {
!self.is_http2()
}

fn is_http2(&self) -> bool {
match self.tx {
PoolTx::Http1(_) => false,
PoolTx::Http2(_) => true,
}
}

fn is_ready(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_ready(),
Expand Down Expand Up @@ -650,6 +680,10 @@ where
}
}
}

fn can_share(&self) -> bool {
self.is_http2()
}
}

// FIXME: allow() required due to `impl Trait` leaking types to this lint
Expand Down
Loading

0 comments on commit 976a77a

Please sign in to comment.