Skip to content

Commit

Permalink
feat(client): Client will retry requests on fresh connections
Browse files Browse the repository at this point in the history
If a request sees an error on a pooled connection before ever writing
any bytes, it will now retry with a new connection.

This can be configured with `Config::retry_canceled_requests(bool)`.
  • Loading branch information
seanmonstar committed Feb 15, 2018
1 parent 0ea3bcf commit ee61ea9
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 114 deletions.
16 changes: 8 additions & 8 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use futures::sync::{mpsc, oneshot};
use common::Never;
use super::cancel::{Cancel, Canceled};

pub type Callback<U> = oneshot::Sender<::Result<U>>;
pub type Promise<U> = oneshot::Receiver<::Result<U>>;
pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type Promise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;

pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded();
Expand All @@ -23,7 +23,7 @@ pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {

pub struct Sender<T, U> {
cancel: Cancel,
inner: mpsc::UnboundedSender<(T, Callback<U>)>,
inner: mpsc::UnboundedSender<(T, Callback<T, U>)>,
}

impl<T, U> Sender<T, U> {
Expand All @@ -35,7 +35,7 @@ impl<T, U> Sender<T, U> {
self.cancel.cancel();
}

pub fn send(&self, val: T) -> Result<Promise<U>, T> {
pub fn send(&self, val: T) -> Result<Promise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send((val, tx))
.map(move |_| rx)
Expand All @@ -54,11 +54,11 @@ impl<T, U> Clone for Sender<T, U> {

pub struct Receiver<T, U> {
canceled: Canceled,
inner: mpsc::UnboundedReceiver<(T, Callback<U>)>,
inner: mpsc::UnboundedReceiver<(T, Callback<T, U>)>,
}

impl<T, U> Stream for Receiver<T, U> {
type Item = (T, Callback<U>);
type Item = (T, Callback<T, U>);
type Error = Never;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Expand All @@ -83,9 +83,9 @@ impl<T, U> Drop for Receiver<T, U> {
// - Ready(None): the end. we want to stop looping
// - NotReady: unreachable
// - Err: unreachable
while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() {
while let Ok(Async::Ready(Some((val, cb)))) = self.inner.poll() {
// maybe in future, we pass the value along with the error?
let _ = cb.send(Err(::Error::new_canceled(None)));
let _ = cb.send(Err((::Error::new_canceled(None), Some(val))));
}
}

Expand Down
Loading

0 comments on commit ee61ea9

Please sign in to comment.