Skip to content

Commit

Permalink
feat(error): revamp hyper::Error type
Browse files Browse the repository at this point in the history
**The `Error` is now an opaque struct**, which allows for more variants to
be added freely, and the internal representation to change without being
breaking changes.

For inspecting an `Error`, there are several `is_*` methods to check for
certain classes of errors, such as `Error::is_parse()`. The `cause` can
also be inspected, like before. This likely seems like a downgrade, but
more inspection can be added as needed!

The `Error` now knows about more states, which gives much more context
around when a certain error occurs. This is also expressed in the
description and `fmt` messages.

**Most places where a user would provide an error to hyper can now pass
any error type** (`E: Into<Box<std::error::Error>>`). This error is passed
back in relevant places, and can be useful for logging. This should make
it much clearer about what error a user should provide to hyper: any it
feels is relevant!

Closes #1128
Closes #1130
Closes #1431
Closes #1338

BREAKING CHANGE: `Error` is no longer an enum to pattern match over, or
  to construct. Code will need to be updated accordingly.

  For body streams or `Service`s, inference might be unable to determine
  what error type you mean to return. Starting in Rust 1.26, you could
  just label that as `!` if you never return an error.
  • Loading branch information
seanmonstar committed Apr 10, 2018
1 parent 33874f9 commit 5d3c472
Show file tree
Hide file tree
Showing 22 changed files with 514 additions and 402 deletions.
4 changes: 2 additions & 2 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("content-length", "1000000"), || {
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter_ok(S.iter()).map(|&s| s))
Body::wrap_stream(stream::iter_ok::<_, String>(S.iter()).map(|&s| s))
})
}

Expand All @@ -96,7 +96,7 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) {
fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("transfer-encoding", "chunked"), || {
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter_ok(S.iter()).map(|&s| s))
Body::wrap_stream(stream::iter_ok::<_, String>(S.iter()).map(|&s| s))
})
}

Expand Down
5 changes: 3 additions & 2 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ fn main() {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());

res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
res.into_body().into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk)
.map_err(|e| panic!("example expects stdout is open, error={}", e))
})
}).map(|_| {
println!("\n\nDone.");
Expand Down
3 changes: 2 additions & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ fn main() {
let addr = ([127, 0, 0, 1], 3000).into();

let new_service = const_service(service_fn(|_| {
Ok(Response::new(Body::from(PHRASE)))
//TODO: when `!` is stable, replace error type
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}));

tokio::run(lazy(move || {
Expand Down
9 changes: 4 additions & 5 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use futures::future::lazy;
use futures::sync::oneshot;

use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::error::Error;
use hyper::server::{Http, Service};

use std::fs::File;
Expand All @@ -19,7 +18,7 @@ use std::thread;
static NOTFOUND: &[u8] = b"Not Found";
static INDEX: &str = "examples/send_file_index.html";

fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper::Error> + Send> {
fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = io::Error> + Send> {
// Serve a file by reading it entirely into memory. As a result
// this is limited to serving small files, but it is somewhat
// simpler with a little less overhead.
Expand Down Expand Up @@ -56,15 +55,15 @@ fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper:
};
});

Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e))))
Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
}

struct ResponseExamples;

impl Service for ResponseExamples {
type Request = Request<Body>;
type Response = Response<Body>;
type Error = hyper::Error;
type Error = io::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;

fn call(&self, req: Request<Body>) -> Self::Future {
Expand Down Expand Up @@ -119,7 +118,7 @@ impl Service for ResponseExamples {
*/
});

Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e))))
Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
},
(&Method::GET, "/no_file.html") => {
// Test what happens when file cannot be be found
Expand Down
24 changes: 12 additions & 12 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct SendRequest<B> {
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
inner: proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
Expand Down Expand Up @@ -138,7 +138,7 @@ impl<B> SendRequest<B>

impl<B> SendRequest<B>
where
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
/// Sends a `Request` on the associated connection.
///
Expand Down Expand Up @@ -262,7 +262,7 @@ impl<B> fmt::Debug for SendRequest<B> {
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
/// Return the inner IO object, and additional information.
pub fn into_parts(self) -> Parts<T> {
Expand All @@ -289,7 +289,7 @@ where
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
type Item = ();
type Error = ::Error;
Expand All @@ -302,7 +302,7 @@ where
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
Expand Down Expand Up @@ -331,7 +331,7 @@ impl Builder {
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
Handshake {
inner: HandshakeInner {
Expand All @@ -345,7 +345,7 @@ impl Builder {
pub(super) fn handshake_no_upgrades<T, B>(&self, io: T) -> HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
HandshakeNoUpgrades {
inner: HandshakeInner {
Expand All @@ -362,7 +362,7 @@ impl Builder {
impl<T, B> Future for Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
Expand All @@ -387,7 +387,7 @@ impl<T, B> fmt::Debug for Handshake<T, B> {
impl<T, B> Future for HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
Expand All @@ -406,7 +406,7 @@ where
impl<T, B, R> Future for HandshakeInner<T, B, R>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
R: proto::Http1Transaction<
Incoming=StatusCode,
Outgoing=proto::RequestLine,
Expand Down Expand Up @@ -470,15 +470,15 @@ impl<B: Send> AssertSendSync for SendRequest<B> {}
impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
B::Data: Send + 'static,
{}

#[doc(hidden)]
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
B::Data: Send + Sync + 'static,
{}

Expand Down
2 changes: 1 addition & 1 deletion src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait Connect: Send + Sync {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite + Send + 'static;
/// An error occured when trying to connect.
type Error;
type Error: Into<Box<StdError + Send + Sync>>;
/// A Future that will resolve to the connected Transport.
type Future: Future<Item=(Self::Transport, Connected), Error=Self::Error> + Send;
/// Connect to a destination.
Expand Down
13 changes: 8 additions & 5 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ impl<T, U> Sender<T, U> {
// there's room in the queue, but does the Connection
// want a message yet?
self.giver.poll_want()
.map_err(|_| ::Error::Closed)
.map_err(|_| ::Error::new_closed())
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(::Error::Closed),
Err(_) => Err(::Error::new_closed()),
}
}

Expand Down Expand Up @@ -184,9 +184,12 @@ mod tests {
drop(rx);

promise.then(|fulfilled| {
let res = fulfilled.expect("fulfilled");
match res.unwrap_err() {
(::Error::Cancel(_), Some(_)) => (),
let err = fulfilled
.expect("fulfilled")
.expect_err("promise should error");

match (err.0.kind(), err.1) {
(&::error::Kind::Canceled, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}

Expand Down
49 changes: 21 additions & 28 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ impl<C, B> Client<C, B> {
}

impl<C, B> Client<C, B>
where C: Connect<Error=io::Error> + Sync + 'static,
where C: Connect + Sync + 'static,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send + 'static,
B: Entity + Send + 'static,
B::Data: Send,
{

Expand Down Expand Up @@ -139,13 +139,14 @@ where C: Connect<Error=io::Error> + Sync + 'static,
Version::HTTP_11 => (),
other => {
error!("Request has unsupported version \"{:?}\"", other);
return FutureResponse(Box::new(future::err(::Error::Version)));
//TODO: replace this with a proper variant
return FutureResponse(Box::new(future::err(::Error::new_user_unsupported_version())));
}
}

if req.method() == &Method::CONNECT {
debug!("Client does not support CONNECT requests");
return FutureResponse(Box::new(future::err(::Error::Method)));
return FutureResponse(Box::new(future::err(::Error::new_user_unsupported_request_method())));
}

let uri = req.uri().clone();
Expand All @@ -154,7 +155,8 @@ where C: Connect<Error=io::Error> + Sync + 'static,
format!("{}://{}", scheme, auth)
}
_ => {
return FutureResponse(Box::new(future::err(::Error::Io(
//TODO: replace this with a proper variant
return FutureResponse(Box::new(future::err(::Error::new_io(
io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URI for Client Request"
Expand Down Expand Up @@ -203,13 +205,13 @@ where C: Connect<Error=io::Error> + Sync + 'static,
};
future::lazy(move || {
connector.connect(dst)
.from_err()
.map_err(::Error::new_connect)
.and_then(move |(io, connected)| {
conn::Builder::new()
.h1_writev(h1_writev)
.handshake_no_upgrades(io)
.and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?;
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)));
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
Expand Down Expand Up @@ -260,9 +262,7 @@ where C: Connect<Error=io::Error> + Sync + 'static,
} else if !res.body().is_empty() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let _ = executor.execute(
executor.execute(
future::poll_fn(move || {
pooled.tx.poll_ready()
})
Expand All @@ -277,7 +277,6 @@ where C: Connect<Error=io::Error> + Sync + 'static,
Ok(res)
});


fut
});

Expand All @@ -290,9 +289,9 @@ where C: Connect<Error=io::Error> + Sync + 'static,
}

impl<C, B> Service for Client<C, B>
where C: Connect<Error=io::Error> + 'static,
where C: Connect + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send + 'static,
B: Entity + Send + 'static,
B::Data: Send,
{
type Request = Request<B>;
Expand Down Expand Up @@ -353,9 +352,9 @@ struct RetryableSendRequest<C, B> {

impl<C, B> Future for RetryableSendRequest<C, B>
where
C: Connect<Error=io::Error> + 'static,
C: Connect + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send + 'static,
B: Entity + Send + 'static,
B::Data: Send,
{
type Item = Response<Body>;
Expand Down Expand Up @@ -564,10 +563,10 @@ impl<C, B> Config<C, B> {
}

impl<C, B> Config<C, B>
where C: Connect<Error=io::Error>,
where C: Connect,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send,
B: Entity + Send,
B::Data: Send,
{
/// Construct the Client with this configuration.
Expand All @@ -590,7 +589,7 @@ where C: Connect<Error=io::Error>,

impl<B> Config<UseDefaultConnector, B>
where
B: Entity<Error=::Error> + Send,
B: Entity + Send,
B::Data: Send,
{
/// Construct the Client with this configuration.
Expand Down Expand Up @@ -640,7 +639,6 @@ impl<C: Clone, B> Clone for Config<C, B> {
}
}


// ===== impl Exec =====

#[derive(Clone)]
Expand All @@ -655,24 +653,19 @@ impl Exec {
Exec::Executor(Arc::new(executor))
}

fn execute<F>(&self, fut: F) -> io::Result<()>
fn execute<F>(&self, fut: F)
where
F: Future<Item=(), Error=()> + Send + 'static,
{
match *self {
Exec::Default => spawn(fut),
Exec::Executor(ref e) => {
e.execute(bg(Box::new(fut)))
let _ = e.execute(bg(Box::new(fut)))
.map_err(|err| {
debug!("executor error: {:?}", err.kind());
io::Error::new(
io::ErrorKind::Other,
"executor error",
)
})?
panic!("executor error: {:?}", err.kind());
});
},
}
Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl<T: Closed + Send + 'static> Pool<T> {
interval: interval,
pool: Arc::downgrade(&self.inner),
pool_drop_notifier: rx,
}).unwrap();
});
}
}

Expand Down
Loading

0 comments on commit 5d3c472

Please sign in to comment.