diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index d920c07ce2..55ef12cece 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -158,6 +158,9 @@ where debug!("flushed {} bytes", n); if self.write_buf.remaining() == 0 { break; + } else if n == 0 { + trace!("write returned zero, but {} bytes remaining", self.write_buf.remaining()); + return Err(io::ErrorKind::WriteZero.into()) } } try_nb!(self.io.flush()) @@ -391,8 +394,20 @@ impl Buf for VecOrBuf { #[inline] fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { match *self { - VecOrBuf::Vec(ref v) => v.bytes_vec(dst), - VecOrBuf::Buf(ref b) => b.bytes_vec(dst), + VecOrBuf::Vec(ref v) => { + if v.has_remaining() { + v.bytes_vec(dst) + } else { + 0 + } + }, + VecOrBuf::Buf(ref b) => { + if b.has_remaining() { + b.bytes_vec(dst) + } else { + 0 + } + }, } } } @@ -420,11 +435,12 @@ impl Buf for BufDeque { #[inline] fn bytes(&self) -> &[u8] { - if let Some(buf) = self.bufs.front() { - buf.bytes() - } else { - &[] + for buf in &self.bufs { + if buf.has_remaining() { + return buf.bytes(); + } } + &[] } #[inline] diff --git a/tests/client.rs b/tests/client.rs index b3552163ed..8183a1d20e 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1006,8 +1006,6 @@ mod dispatch_impl { assert_eq!(err.description(), "event loop gone"); } - - #[test] fn client_custom_executor() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -1046,6 +1044,45 @@ mod dispatch_impl { assert_eq!(closes.load(Ordering::Relaxed), 1); } + #[test] + fn client_body_mpsc() { + use futures::Sink; + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let closes = Arc::new(AtomicUsize::new(0)); + + let (tx1, rx1) = oneshot::channel(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); + let _ = tx1.send(()); + }); + + let uri = format!("http://{}/a", addr).parse().unwrap(); + + let client = Client::configure() + .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) + .build(&handle); + let mut req = Request::new(Method::Post, uri); + let (tx, body) = hyper::Body::pair(); + req.set_body(body); + let res = client.request(req).and_then(move |res| { + assert_eq!(res.status(), hyper::StatusCode::Ok); + res.body().concat2() + }); + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let send = tx.send_all(::futures::stream::iter_ok(vec!["hello"; 2]).map(hyper::Chunk::from).map(Ok)).then(|_| Ok(())); + core.run(res.join(send).join(rx).map(|r| r.0)).unwrap(); + } + struct DebugConnector(HttpConnector, Arc); impl Service for DebugConnector {