Skip to content

Commit

Permalink
fix(transport): return Poll::ready until error is consumed (#536)
Browse files Browse the repository at this point in the history
* fix(transport): return Poll::ready until error is consumed

When a lazy connection fails to connect it first
returns Poll::ready from the reconnect service,
yet the subsequent call returns Poll::pending
making tower_balance loop forever.
Instead, on error we return Ready
until the error is consumed in the
call method.

* chore: revert version change

* refactor: Into<Error> bounds for error intead of debug

* Remove fmt::Debug bound for reconnect

Co-authored-by: Helge Hoff <[email protected]>
  • Loading branch information
LucioFranco and tl-helge-hoff authored Jan 15, 2021
1 parent 31936e0 commit dafea9a
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions tonic/src/transport/service/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use tracing::trace;
pub(crate) struct Reconnect<M, Target>
where
M: Service<Target>,
M::Error: Into<Error>,
{
mk_service: M,
state: State<M::Future, M::Response>,
target: Target,
error: Option<M::Error>,
error: Option<crate::Error>,
has_been_connected: bool,
is_lazy: bool,
}
Expand All @@ -32,6 +33,7 @@ enum State<F, S> {
impl<M, Target> Reconnect<M, Target>
where
M: Service<Target>,
M::Error: Into<Error>,
{
pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
Reconnect {
Expand All @@ -52,14 +54,19 @@ where
M::Future: Unpin,
Error: From<M::Error> + From<S::Error>,
Target: Clone,
<M as tower_service::Service<Target>>::Error: Into<crate::Error>,
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future, M::Error>;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut state;

if self.error.is_some() {
return Poll::Ready(Ok(()));
}

loop {
match self.state {
State::Idle => {
Expand Down Expand Up @@ -94,7 +101,9 @@ where
if !(self.has_been_connected || self.is_lazy) {
return Poll::Ready(Err(e.into()));
} else {
self.error = Some(e);
let error = e.into();
tracing::error!("reconnect::poll_ready: {:?}", error);
self.error = Some(error);
break;
}
}
Expand Down Expand Up @@ -130,8 +139,10 @@ where
}

fn call(&mut self, request: Request) -> Self::Future {
tracing::trace!("Reconnect::call");
if let Some(error) = self.error.take() {
return ResponseFuture::error(error);
tracing::error!("error: {:?}", error);
return ResponseFuture::error(error.into());
}

let service = match self.state {
Expand All @@ -150,6 +161,7 @@ where
M::Future: fmt::Debug,
M::Response: fmt::Debug,
Target: fmt::Debug,
<M as tower_service::Service<Target>>::Error: Into<Error>,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Reconnect")
Expand All @@ -163,37 +175,36 @@ where
/// Future that resolves to the response or failure to connect.
#[pin_project]
#[derive(Debug)]
pub(crate) struct ResponseFuture<F, E> {
pub(crate) struct ResponseFuture<F> {
#[pin]
inner: Inner<F, E>,
inner: Inner<F>,
}

#[pin_project(project = InnerProj)]
#[derive(Debug)]
enum Inner<F, E> {
enum Inner<F> {
Future(#[pin] F),
Error(Option<E>),
Error(Option<crate::Error>),
}

impl<F, E> ResponseFuture<F, E> {
impl<F> ResponseFuture<F> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture {
inner: Inner::Future(inner),
}
}

pub(crate) fn error(error: E) -> Self {
pub(crate) fn error(error: crate::Error) -> Self {
ResponseFuture {
inner: Inner::Error(Some(error)),
}
}
}

impl<F, T, E, ME> Future for ResponseFuture<F, ME>
impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
ME: Into<Error>,
{
type Output = Result<T, Error>;

Expand All @@ -203,7 +214,7 @@ where
match me.inner.project() {
InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
InnerProj::Error(e) => {
let e = e.take().expect("Polled after ready.").into();
let e = e.take().expect("Polled after ready.");
Poll::Ready(Err(e))
}
}
Expand Down

0 comments on commit dafea9a

Please sign in to comment.