From 37570e82baf12951f8bb408884e31640b0d29dae Mon Sep 17 00:00:00 2001 From: Nathan West Date: Thu, 28 May 2020 22:07:13 -0400 Subject: [PATCH 01/31] Substantial refactor to the design of LineWriter This commit redesigns LineWriter to work more directly on the internals of BufWriter. This interface change is to enable a future Pull Request in which Stdout can be switched between Line and Block buffered mode. --- src/libstd/io/buffered.rs | 352 ++++++++++++++++++++++++++++---------- 1 file changed, 259 insertions(+), 93 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 046b1a6888024..fff4d0df7e769 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -449,6 +449,12 @@ impl Seek for BufReader { #[stable(feature = "rust1", since = "1.0.0")] pub struct BufWriter { inner: Option, + // FIXME: Replace this with a VecDeque. Because VecDeque is a Ring buffer, + // this would enable BufWriter to operate without any interior copies. + // It was also allow a much simpler implementation of flush_buf. The main + // blocker here is that VecDeque doesn't currently have the same + // slice-specific specializations (extend_from_slice, `Extend` + // specializations) buf: Vec, // #30888: If the inner writer panics in a call to write, we don't want to // write the buffered data a second time in BufWriter's destructor. This @@ -519,6 +525,13 @@ impl BufWriter { BufWriter { inner: Some(inner), buf: Vec::with_capacity(capacity), panicked: false } } + /// Send data in our local buffer into the inner writer, looping as + /// necessary until either it's all been sent or an error occurs. + /// + /// Because all the data in the buffer has been reported to our owner as + /// "successfully written" (by returning nonzero success values from + /// `write`), any 0-length writes from `inner` must be reported as i/o + /// errors from this method. fn flush_buf(&mut self) -> io::Result<()> { let mut written = 0; let len = self.buf.len(); @@ -548,6 +561,17 @@ impl BufWriter { ret } + /// Buffer some data without flushing it, regardless of the size of the + /// data. Writes as much as possible without exceeding capacity. Returns + /// the number of bytes written. + #[inline] + fn write_to_buffer(&mut self, buf: &[u8]) -> usize { + let available = self.buf.capacity() - self.buf.len(); + let amt_to_buffer = available.min(buf.len()); + self.buf.extend_from_slice(&buf[..amt_to_buffer]); + amt_to_buffer + } + /// Gets a reference to the underlying writer. /// /// # Examples @@ -665,7 +689,22 @@ impl Write for BufWriter { self.panicked = false; r } else { - self.buf.write(buf) + Ok(self.write_to_buffer(buf)) + } + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + if self.buf.len() + buf.len() > self.buf.capacity() { + self.flush_buf()?; + } + if buf.len() >= self.buf.capacity() { + self.panicked = true; + let r = self.get_mut().write_all(buf); + self.panicked = false; + r + } else { + self.write_to_buffer(buf); + Ok(()) } } @@ -818,6 +857,200 @@ impl fmt::Display for IntoInnerError { } } +/// Private helper struct for implementing the line-buffered writing logic. +/// This shim temporarily wraps a BufWriter, and uses its internals to +/// implement a line-buffered writer (specifically by using the internal +/// methods like write_to_buffer and flush_buffer). In this way, a more +/// efficient abstraction can be created than one that only had access to +/// `write` and `flush`, without needlessly duplicating a lot of the +/// implementation details of BufWriter. This also allows existing +/// `BufWriters` to be temporarily given line-buffering logic; this is what +/// enables Stdout to be alternately in line-buffered or block-buffered mode. +#[derive(Debug)] +pub(super) struct LineWriterShim<'a, W: Write> { + inner: &'a mut BufWriter, +} + +impl<'a, W: Write> LineWriterShim<'a, W> { + pub fn new(inner: &'a mut BufWriter) -> Self { + Self { inner } + } +} + +impl<'a, W: Write> Write for LineWriterShim<'a, W> { + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. Returns the number of bytes written. + /// + /// This function operates on a "best effort basis"; in keeping with the + /// convention of `Write::write`, it makes at most one attempt to write + /// new data to the underlying writer. If that write only reports a partial + /// success, the remaining data will be buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines, even if the incoming data does not contain any newlines. + fn write(&mut self, buf: &[u8]) -> io::Result { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write + let newline_idx = match memchr::memrchr(b'\n', buf) { + None => { + // Check for prior partial line writes that need to be retried + if memchr::memchr(b'\n', &self.inner.buffer()).is_some() { + self.inner.flush_buf()?; + } + return self.inner.write(buf); + } + Some(i) => i, + }; + + // Flush existing content to prepare for our write + self.inner.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx + 1]; + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.panicked here. + let flushed = self.inner.get_mut().write(lines)?; + + // Now that the write has succeeded, buffer the rest (or as much of + // the rest as possible). If there were any unwritten newlines, we + // only buffer out to the last unwritten newline; this helps prevent + // flushing partial lines on subsequent calls to write_buffered_lines. + let tail = &buf[flushed..]; + let buffered = match memchr::memrchr(b'\n', tail) { + None => self.inner.write_to_buffer(tail), + Some(i) => self.inner.write_to_buffer(&tail[..i + 1]), + }; + Ok(flushed + buffered) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } + + /// Write some vectored data into this BufReader with line buffering. This + /// means that, if any newlines are present in the data, the data up to + /// and including the buffer containing the last newline is sent directly + /// to the inner writer, and the data after it is buffered. Returns the + /// number of bytes written. + /// + /// This function operates on a "best effort basis"; in keeping with the + /// convention of `Write::write`, it makes at most one attempt to write + /// new data to the underlying writer. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines. + /// + /// Because sorting through an array of `IoSlice` can be a bit convoluted, + /// This method differs from write in the following ways: + /// + /// - It attempts to write all the buffers up to and including the one + /// containing the last newline. This means that it may attempt to + /// write a partial line. + /// - If the write only reports partial success, it does not attempt to + /// find the precise location of the written bytes and buffer the rest. + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + // Find the buffer containing the last newline + let last_newline_buf_idx = bufs + .iter() + .enumerate() + .rev() + .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i)); + + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write + let last_newline_buf_idx = match last_newline_buf_idx { + // No newlines; just do a normal buffered write + None => { + // Check for prior partial line writes that need to be retried + if memchr::memchr(b'\n', &self.inner.buffer()).is_some() { + self.inner.flush_buf()?; + } + return self.inner.write_vectored(bufs); + } + Some(i) => i, + }; + + // Flush existing content to prepare for our write + self.inner.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.panicked here. + let flushed = self.inner.write_vectored(lines)?; + + // Don't try to reconstruct the exact amount written; just bail + // in the event of a partial write + let lines_len = lines.iter().map(|buf| buf.len()).sum(); + if flushed < lines_len { + return Ok(flushed); + } + + // Now that the write has succeeded, buffer the rest (or as much of the + // rest as possible) + let buffered: usize = + tail.iter().map(|buf| self.inner.write_to_buffer(buf)).take_while(|&n| n > 0).sum(); + + Ok(flushed + buffered) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines, even if the incoming data does not contain any newlines. + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write + let newline_idx = match memchr::memrchr(b'\n', buf) { + None => { + // Check for prior partial line writes that need to be retried + if memchr::memchr(b'\n', &self.inner.buffer()).is_some() { + self.inner.flush_buf()?; + } + return self.inner.write_all(buf); + } + Some(i) => i, + }; + + // Flush existing content to prepare for our write + self.inner.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let (lines, tail) = buf.split_at(newline_idx + 1); + + // Write `lines` directly to the inner writer, bypassing the buffer. + self.inner.get_mut().write_all(lines)?; + + // Now that the write has succeeded, buffer the rest with BufWriter::write_all. + // This will buffer as much as possible, but continue flushing as + // necessary if our tail is huge. + self.inner.write_all(tail) + } +} + /// Wraps a writer and buffers output to it, flushing whenever a newline /// (`0x0a`, `'\n'`) is detected. /// @@ -885,7 +1118,6 @@ impl fmt::Display for IntoInnerError { #[stable(feature = "rust1", since = "1.0.0")] pub struct LineWriter { inner: BufWriter, - need_flush: bool, } impl LineWriter { @@ -926,7 +1158,7 @@ impl LineWriter { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { - LineWriter { inner: BufWriter::with_capacity(capacity, inner), need_flush: false } + LineWriter { inner: BufWriter::with_capacity(capacity, inner) } } /// Gets a reference to the underlying writer. @@ -1000,110 +1232,40 @@ impl LineWriter { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn into_inner(self) -> Result>> { - self.inner.into_inner().map_err(|IntoInnerError(buf, e)| { - IntoInnerError(LineWriter { inner: buf, need_flush: false }, e) - }) + self.inner + .into_inner() + .map_err(|IntoInnerError(buf, e)| IntoInnerError(LineWriter { inner: buf }, e)) } } #[stable(feature = "rust1", since = "1.0.0")] impl Write for LineWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - if self.need_flush { - self.flush()?; - } - - // Find the last newline character in the buffer provided. If found then - // we're going to write all the data up to that point and then flush, - // otherwise we just write the whole block to the underlying writer. - let i = match memchr::memrchr(b'\n', buf) { - Some(i) => i, - None => return self.inner.write(buf), - }; - - // Ok, we're going to write a partial amount of the data given first - // followed by flushing the newline. After we've successfully written - // some data then we *must* report that we wrote that data, so future - // errors are ignored. We set our internal `need_flush` flag, though, in - // case flushing fails and we need to try it first next time. - let n = self.inner.write(&buf[..=i])?; - self.need_flush = true; - if self.flush().is_err() || n != i + 1 { - return Ok(n); - } + LineWriterShim::new(&mut self.inner).write(buf) + } - // At this point we successfully wrote `i + 1` bytes and flushed it out, - // meaning that the entire line is now flushed out on the screen. While - // we can attempt to finish writing the rest of the data provided. - // Remember though that we ignore errors here as we've successfully - // written data, so we need to report that. - match self.inner.write(&buf[i + 1..]) { - Ok(i) => Ok(n + i), - Err(_) => Ok(n), - } + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() } - // Vectored writes are very similar to the writes above, but adjusted for - // the list of buffers that we have to write. fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { - if self.need_flush { - self.flush()?; - } + LineWriterShim::new(&mut self.inner).write_vectored(bufs) + } - // Find the last newline, and failing that write the whole buffer - let last_newline = bufs.iter().enumerate().rev().find_map(|(i, buf)| { - let pos = memchr::memrchr(b'\n', buf)?; - Some((i, pos)) - }); - let (i, j) = match last_newline { - Some(pair) => pair, - None => return self.inner.write_vectored(bufs), - }; - let (prefix, suffix) = bufs.split_at(i); - let (buf, suffix) = suffix.split_at(1); - let buf = &buf[0]; - - // Write everything up to the last newline, flushing afterwards. Note - // that only if we finished our entire `write_vectored` do we try the - // subsequent - // `write` - let mut n = 0; - let prefix_amt = prefix.iter().map(|i| i.len()).sum(); - if prefix_amt > 0 { - n += self.inner.write_vectored(prefix)?; - self.need_flush = true; - } - if n == prefix_amt { - match self.inner.write(&buf[..=j]) { - Ok(m) => n += m, - Err(e) if n == 0 => return Err(e), - Err(_) => return Ok(n), - } - self.need_flush = true; - } - if self.flush().is_err() || n != j + 1 + prefix_amt { - return Ok(n); - } + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } - // ... and now write out everything remaining - match self.inner.write(&buf[j + 1..]) { - Ok(i) => n += i, - Err(_) => return Ok(n), - } + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_all(buf) + } - if suffix.iter().map(|s| s.len()).sum::() == 0 { - return Ok(n); - } - match self.inner.write_vectored(suffix) { - Ok(i) => Ok(n + i), - Err(_) => Ok(n), - } + fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_all_vectored(bufs) } - fn flush(&mut self) -> io::Result<()> { - self.inner.flush()?; - self.need_flush = false; - Ok(()) + fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_fmt(fmt) } } @@ -1137,7 +1299,11 @@ mod tests { impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } } } From 0f3815886a0d7aaefd222b4592c3ef2ccfd0ebc2 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 31 May 2020 01:29:48 -0400 Subject: [PATCH 02/31] Updated comments; only pre-flush newline terminated buffers --- src/libstd/io/buffered.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index fff4d0df7e769..0ae5db0df1778 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -896,8 +896,10 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // one line), just do a regular buffered write let newline_idx = match memchr::memrchr(b'\n', buf) { None => { - // Check for prior partial line writes that need to be retried - if memchr::memchr(b'\n', &self.inner.buffer()).is_some() { + // Check for prior partial line writes that need to be retried. + // Only retry if the buffer contains a completed line, to + // avoid flushing partial lines. + if let Some(b'\n') = self.inner.buffer().last().copied() { self.inner.flush_buf()?; } return self.inner.write(buf); @@ -916,13 +918,13 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // `write` convention, make at most one attempt to add new (unbuffered) // data. Because this write doesn't touch the BufWriter state directly, // and the buffer is known to be empty, we don't need to worry about - // self.panicked here. + // self.inner.panicked here. let flushed = self.inner.get_mut().write(lines)?; // Now that the write has succeeded, buffer the rest (or as much of // the rest as possible). If there were any unwritten newlines, we // only buffer out to the last unwritten newline; this helps prevent - // flushing partial lines on subsequent calls to write_buffered_lines. + // flushing partial lines on subsequent calls to LineWriterShim::write. let tail = &buf[flushed..]; let buffered = match memchr::memrchr(b'\n', tail) { None => self.inner.write_to_buffer(tail), @@ -970,8 +972,10 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { let last_newline_buf_idx = match last_newline_buf_idx { // No newlines; just do a normal buffered write None => { - // Check for prior partial line writes that need to be retried - if memchr::memchr(b'\n', &self.inner.buffer()).is_some() { + // Check for prior partial line writes that need to be retried. + // Only retry if the buffer contains a completed line, to + // avoid flushing partial lines. + if let Some(b'\n') = self.inner.buffer().last().copied() { self.inner.flush_buf()?; } return self.inner.write_vectored(bufs); @@ -1025,8 +1029,10 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // one line), just do a regular buffered write let newline_idx = match memchr::memrchr(b'\n', buf) { None => { - // Check for prior partial line writes that need to be retried - if memchr::memchr(b'\n', &self.inner.buffer()).is_some() { + // Check for prior partial line writes that need to be retried. + // Only retry if the buffer contains a completed line, to + // avoid flushing partial lines. + if let Some(b'\n') = self.inner.buffer().last().copied() { self.inner.flush_buf()?; } return self.inner.write_all(buf); From 4a1597f7a77b17e42217d02750af2ae8012430af Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 31 May 2020 02:37:36 -0400 Subject: [PATCH 03/31] Expressionify `LineWriterShim::write` --- src/libstd/io/buffered.rs | 62 ++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 0ae5db0df1778..6098416df19b2 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -892,9 +892,9 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// writer, it will also flush the existing buffer if it contains any /// newlines, even if the incoming data does not contain any newlines. fn write(&mut self, buf: &[u8]) -> io::Result { - // If there are no new newlines (that is, if this write is less than - // one line), just do a regular buffered write - let newline_idx = match memchr::memrchr(b'\n', buf) { + match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write None => { // Check for prior partial line writes that need to be retried. // Only retry if the buffer contains a completed line, to @@ -902,35 +902,37 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { if let Some(b'\n') = self.inner.buffer().last().copied() { self.inner.flush_buf()?; } - return self.inner.write(buf); + self.inner.write(buf) + } + // Otherwise, arrange for the lines to be written directly to the + // inner writer. + Some(newline_idx) => { + // Flush existing content to prepare for our write + self.inner.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx + 1]; + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.inner.panicked here. + let flushed = self.inner.get_mut().write(lines)?; + + // Now that the write has succeeded, buffer the rest (or as much of + // the rest as possible). If there were any unwritten newlines, we + // only buffer out to the last unwritten newline; this helps prevent + // flushing partial lines on subsequent calls to LineWriterShim::write. + let tail = &buf[flushed..]; + let buffered = match memchr::memrchr(b'\n', tail) { + None => self.inner.write_to_buffer(tail), + Some(i) => self.inner.write_to_buffer(&tail[..i + 1]), + }; + Ok(flushed + buffered) } - Some(i) => i, - }; - - // Flush existing content to prepare for our write - self.inner.flush_buf()?; - - // This is what we're going to try to write directly to the inner - // writer. The rest will be buffered, if nothing goes wrong. - let lines = &buf[..newline_idx + 1]; - - // Write `lines` directly to the inner writer. In keeping with the - // `write` convention, make at most one attempt to add new (unbuffered) - // data. Because this write doesn't touch the BufWriter state directly, - // and the buffer is known to be empty, we don't need to worry about - // self.inner.panicked here. - let flushed = self.inner.get_mut().write(lines)?; - - // Now that the write has succeeded, buffer the rest (or as much of - // the rest as possible). If there were any unwritten newlines, we - // only buffer out to the last unwritten newline; this helps prevent - // flushing partial lines on subsequent calls to LineWriterShim::write. - let tail = &buf[flushed..]; - let buffered = match memchr::memrchr(b'\n', tail) { - None => self.inner.write_to_buffer(tail), - Some(i) => self.inner.write_to_buffer(&tail[..i + 1]), }; - Ok(flushed + buffered) } fn flush(&mut self) -> io::Result<()> { From 5edad37b04d4daac62b01946142589e0f69aa22c Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 31 May 2020 03:21:51 -0400 Subject: [PATCH 04/31] Expressionify write_all --- src/libstd/io/buffered.rs | 40 ++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 6098416df19b2..de2a32345f1c9 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -932,7 +932,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { }; Ok(flushed + buffered) } - }; + } } fn flush(&mut self) -> io::Result<()> { @@ -1027,9 +1027,9 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// writer, it will also flush the existing buffer if it contains any /// newlines, even if the incoming data does not contain any newlines. fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - // If there are no new newlines (that is, if this write is less than - // one line), just do a regular buffered write - let newline_idx = match memchr::memrchr(b'\n', buf) { + match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write None => { // Check for prior partial line writes that need to be retried. // Only retry if the buffer contains a completed line, to @@ -1037,25 +1037,27 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { if let Some(b'\n') = self.inner.buffer().last().copied() { self.inner.flush_buf()?; } - return self.inner.write_all(buf); + self.inner.write_all(buf) } - Some(i) => i, - }; - - // Flush existing content to prepare for our write - self.inner.flush_buf()?; + // Otherwise, arrange for the lines to be written directly to the + // inner writer. + Some(newline_idx) => { + // Flush existing content to prepare for our write + self.inner.flush_buf()?; - // This is what we're going to try to write directly to the inner - // writer. The rest will be buffered, if nothing goes wrong. - let (lines, tail) = buf.split_at(newline_idx + 1); + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let (lines, tail) = buf.split_at(newline_idx + 1); - // Write `lines` directly to the inner writer, bypassing the buffer. - self.inner.get_mut().write_all(lines)?; + // Write `lines` directly to the inner writer, bypassing the buffer. + self.inner.get_mut().write_all(lines)?; - // Now that the write has succeeded, buffer the rest with BufWriter::write_all. - // This will buffer as much as possible, but continue flushing as - // necessary if our tail is huge. - self.inner.write_all(tail) + // Now that the write has succeeded, buffer the rest with BufWriter::write_all. + // This will buffer as much as possible, but continue flushing as + // necessary if our tail is huge. + self.inner.write_all(tail) + } + } } } From 1bf8ba3546aace5a16852d5f58fd9206934cc829 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 31 May 2020 03:28:07 -0400 Subject: [PATCH 05/31] x.py fmt --- src/libstd/io/buffered.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index de2a32345f1c9..35434979b85ad 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -1309,11 +1309,7 @@ mod tests { impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { - Ok(0) - } else { - Ok(self.lengths.remove(0)) - } + if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } } } From e0dfdc683def9573f28d7f2f077790bc3c52132b Mon Sep 17 00:00:00 2001 From: Nathan West Date: Mon, 1 Jun 2020 00:26:48 -0400 Subject: [PATCH 06/31] Added check for `is_write_vectored` --- src/libstd/io/buffered.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 35434979b85ad..4ae2a5eaf95a9 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -961,7 +961,19 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// write a partial line. /// - If the write only reports partial success, it does not attempt to /// find the precise location of the written bytes and buffer the rest. + /// + /// If the underlying vector doesn't support vectored writing, we instead + /// simply write the first non-empty buffer with `write`. This way, we + /// get the benefits of more granular partial-line handling without losing + /// anything in efficiency fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + if !self.is_write_vectored() { + return match bufs.iter().find(|buf| !buf.is_empty()) { + Some(buf) => self.write(buf), + None => Ok(0), + } + } + // Find the buffer containing the last newline let last_newline_buf_idx = bufs .iter() From 2c3024b368cb2810e390f980c3f9820cc07389e8 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Mon, 1 Jun 2020 00:36:31 -0400 Subject: [PATCH 07/31] Add comment describing erroneous_flush_retried --- src/libstd/io/buffered.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 4ae2a5eaf95a9..04c1cbf5a17d3 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -1766,6 +1766,15 @@ mod tests { } } + /// Previously the `LineWriter` could successfully write some bytes but + /// then fail to report that it has done so. Additionally, an erroneous + /// flush after a successful write was permanently ignored. + /// + /// Test that a line writer correctly reports the number of written bytes, + /// and that it attempts to flush buffered lines from previous writes + /// before processing new data + /// + /// Regression test for #37807 #[test] fn erroneous_flush_retried() { let a = AcceptOneThenFail { written: false, flushed: false }; From e89e2e42f9269fe401b1eef0288ff12c7941544f Mon Sep 17 00:00:00 2001 From: Nathan West Date: Mon, 1 Jun 2020 01:04:33 -0400 Subject: [PATCH 08/31] Added test stubs --- src/libstd/io/buffered.rs | 109 ++++++++++++++++++++++++++++++-------- 1 file changed, 86 insertions(+), 23 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 04c1cbf5a17d3..09b24d810c526 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -971,7 +971,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { return match bufs.iter().find(|buf| !buf.is_empty()) { Some(buf) => self.write(buf), None => Ok(0), - } + }; } // Find the buffer containing the last newline @@ -1321,7 +1321,11 @@ mod tests { impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } } } @@ -1742,27 +1746,48 @@ mod tests { b.iter(|| BufWriter::new(io::sink())); } - struct AcceptOneThenFail { - written: bool, + #[derive(Default, Clone)] + struct ProgrammableSink { + // Writes append to this slice + buffer: Vec, + + // Flushes set this flag flushed: bool, + + // If true, writes & flushes will always be an error + return_error: bool, + + // If set, only up to this number of bytes will be written in a single + // call to `write` + accept_prefix: Option, + + // If set, counts down with each write, and writes return an error + // when it hits 0 + max_writes: Option, } - impl Write for AcceptOneThenFail { + impl Write for ProgrammableSink { fn write(&mut self, data: &[u8]) -> io::Result { - if !self.written { - assert_eq!(data, b"a\nb\n"); - self.written = true; - Ok(data.len()) + if self.return_error { + Err(io::Error::new(io::ErrorKind::Other, "test")) } else { - Err(io::Error::new(io::ErrorKind::NotFound, "test")) + let len = match self.accept_prefix { + None => data.len(), + Some(prefix) => prefix.min(prefix), + }; + let data = &data[..len]; + self.buffer.extend_from_slice(data); + Ok(len) } } fn flush(&mut self) -> io::Result<()> { - assert!(self.written); - assert!(!self.flushed); - self.flushed = true; - Err(io::Error::new(io::ErrorKind::Other, "test")) + if self.return_error { + Err(io::Error::new(io::ErrorKind::Other, "test")) + } else { + self.flushed = true; + Ok(()) + } } } @@ -1777,15 +1802,7 @@ mod tests { /// Regression test for #37807 #[test] fn erroneous_flush_retried() { - let a = AcceptOneThenFail { written: false, flushed: false }; - - let mut l = LineWriter::new(a); - assert_eq!(l.write(b"a\nb\na").unwrap(), 4); - assert!(l.get_ref().written); - assert!(l.get_ref().flushed); - l.get_mut().flushed = false; - - assert_eq!(l.write(b"a").unwrap_err().kind(), io::ErrorKind::Other) + todo!() } #[test] @@ -1895,4 +1912,50 @@ mod tests { io::Error::new(io::ErrorKind::Other, "x") } } + + /// Test that, given this input: + /// + /// Line 1\n + /// Line 2\n + /// Line 3\n + /// Line 4 + /// + /// And given a result that only writes to midway through Line 2 + /// + /// That only up to the end of Line 3 is buffered + /// + /// This behavior is desirable because it prevents flushing partial lines + #[test] + fn test_partial_write_buffers_line() { + todo!() + } + + /// Test that, given this input: + /// + /// Line 1\n + /// Line 2\n + /// Line 3 + /// + /// And given that the full write of lines 1 and 2 was successful + /// That data up to Line 3 is buffered + #[test] + fn test_partial_line_buffered_after_line_write() { + todo!() + } + + /// Test that, given a partial line that exceeds the length of + /// LineBuffer's buffer (that is, without a trailing newline), that that + /// line is written to the inner writer + #[test] + fn test_long_line_flushed() { + todo!() + } + + /// Test that, given a very long partial line *after* successfully + /// flushing a complete line, that that line is buffered unconditionally, + /// and no additional writes take place + #[test] + fn test_long_tail_not_flushed() { + todo!() + } } From f0a08073e47cc4a832cea66a7ef6bd9506061a72 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Mon, 1 Jun 2020 17:36:55 -0400 Subject: [PATCH 09/31] Various testing & implementation updates: - Added a bunch of new unit tests - Removed test_line_buffer_fail_flush - Updated erroneous_flush_retried - Added helper methods to LineWriterShim for code clarity, to distinguish "self.buffer" (the BufWriter) from self.inner (the thing wrapped by the BufWriter) - Un-expressionized write & write_all - Added clause to bail early on Ok(0) --- src/libstd/io/buffered.rs | 344 +++++++++++++++++++++++++------------- 1 file changed, 232 insertions(+), 112 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 09b24d810c526..730ae8a0ee471 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -868,12 +868,30 @@ impl fmt::Display for IntoInnerError { /// enables Stdout to be alternately in line-buffered or block-buffered mode. #[derive(Debug)] pub(super) struct LineWriterShim<'a, W: Write> { - inner: &'a mut BufWriter, + buffer: &'a mut BufWriter, } impl<'a, W: Write> LineWriterShim<'a, W> { - pub fn new(inner: &'a mut BufWriter) -> Self { - Self { inner } + pub fn new(buffer: &'a mut BufWriter) -> Self { + Self { buffer } + } + + /// Get a reference to the inner writer (that is, the writer wrapped by + /// the BufWriter) + fn inner(&self) -> &W { + self.buffer.get_ref() + } + + /// Get a mutable reference to the inner writer (that is, the writer + /// wrapped by the BufWriter). Be careful with this writer, as writes to + /// it will bypass the buffer. + fn inner_mut(&mut self) -> &mut W { + self.buffer.get_mut() + } + + /// Get the content currently buffered in self.buffer + fn buffered(&self) -> &[u8] { + self.buffer.buffer() } } @@ -892,51 +910,58 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// writer, it will also flush the existing buffer if it contains any /// newlines, even if the incoming data does not contain any newlines. fn write(&mut self, buf: &[u8]) -> io::Result { - match memchr::memrchr(b'\n', buf) { + let newline_idx = match memchr::memrchr(b'\n', buf) { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write None => { // Check for prior partial line writes that need to be retried. // Only retry if the buffer contains a completed line, to // avoid flushing partial lines. - if let Some(b'\n') = self.inner.buffer().last().copied() { - self.inner.flush_buf()?; + if let Some(b'\n') = self.buffered().last().copied() { + self.buffer.flush_buf()?; } - self.inner.write(buf) + return self.buffer.write(buf); } // Otherwise, arrange for the lines to be written directly to the // inner writer. - Some(newline_idx) => { - // Flush existing content to prepare for our write - self.inner.flush_buf()?; - - // This is what we're going to try to write directly to the inner - // writer. The rest will be buffered, if nothing goes wrong. - let lines = &buf[..newline_idx + 1]; - - // Write `lines` directly to the inner writer. In keeping with the - // `write` convention, make at most one attempt to add new (unbuffered) - // data. Because this write doesn't touch the BufWriter state directly, - // and the buffer is known to be empty, we don't need to worry about - // self.inner.panicked here. - let flushed = self.inner.get_mut().write(lines)?; - - // Now that the write has succeeded, buffer the rest (or as much of - // the rest as possible). If there were any unwritten newlines, we - // only buffer out to the last unwritten newline; this helps prevent - // flushing partial lines on subsequent calls to LineWriterShim::write. - let tail = &buf[flushed..]; - let buffered = match memchr::memrchr(b'\n', tail) { - None => self.inner.write_to_buffer(tail), - Some(i) => self.inner.write_to_buffer(&tail[..i + 1]), - }; - Ok(flushed + buffered) - } + Some(newline_idx) => newline_idx, + }; + + // Flush existing content to prepare for our write + self.buffer.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx + 1]; + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.buffer.panicked here. + let flushed = self.inner_mut().write(lines)?; + + // If buffer returns Ok(0), propagate that to the caller without + // doing additional buffering; otherwise we're just guaranteeing + // an "ErrorKind::WriteZero" later. + if flushed == 0 { + return Ok(0); } + + // Now that the write has succeeded, buffer the rest (or as much of + // the rest as possible). If there were any unwritten newlines, we + // only buffer out to the last unwritten newline; this helps prevent + // flushing partial lines on subsequent calls to LineWriterShim::write. + let tail = &buf[flushed..]; + let buffered = match memchr::memrchr(b'\n', tail) { + None => self.buffer.write_to_buffer(tail), + Some(i) => self.buffer.write_to_buffer(&tail[..i + 1]), + }; + Ok(flushed + buffered) } fn flush(&mut self) -> io::Result<()> { - self.inner.flush() + self.buffer.flush() } /// Write some vectored data into this BufReader with line buffering. This @@ -967,6 +992,8 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// get the benefits of more granular partial-line handling without losing /// anything in efficiency fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + // If there's no specialized behavior for write_vectored, just use + // write. This has the benefit of more granular partial-line handling. if !self.is_write_vectored() { return match bufs.iter().find(|buf| !buf.is_empty()) { Some(buf) => self.write(buf), @@ -989,16 +1016,16 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // Check for prior partial line writes that need to be retried. // Only retry if the buffer contains a completed line, to // avoid flushing partial lines. - if let Some(b'\n') = self.inner.buffer().last().copied() { - self.inner.flush_buf()?; + if let Some(b'\n') = self.buffered().last().copied() { + self.buffer.flush_buf()?; } - return self.inner.write_vectored(bufs); + return self.buffer.write_vectored(bufs); } Some(i) => i, }; // Flush existing content to prepare for our write - self.inner.flush_buf()?; + self.buffer.flush_buf()?; // This is what we're going to try to write directly to the inner // writer. The rest will be buffered, if nothing goes wrong. @@ -1009,7 +1036,14 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // data. Because this write doesn't touch the BufWriter state directly, // and the buffer is known to be empty, we don't need to worry about // self.panicked here. - let flushed = self.inner.write_vectored(lines)?; + let flushed = self.inner_mut().write_vectored(lines)?; + + // If inner returns Ok(0), propagate that to the caller without + // doing additional buffering; otherwise we're just guaranteeing + // an "ErrorKind::WriteZero" later. + if flushed == 0 { + return Ok(0); + } // Don't try to reconstruct the exact amount written; just bail // in the event of a partial write @@ -1021,13 +1055,15 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // Now that the write has succeeded, buffer the rest (or as much of the // rest as possible) let buffered: usize = - tail.iter().map(|buf| self.inner.write_to_buffer(buf)).take_while(|&n| n > 0).sum(); + tail.iter().map(|buf| self.buffer.write_to_buffer(buf)).take_while(|&n| n > 0).sum(); Ok(flushed + buffered) } fn is_write_vectored(&self) -> bool { - self.inner.is_write_vectored() + // It's hard to imagine these diverging, but it's worth checking + // just in case, because we call `write_vectored` on both. + self.buffer.is_write_vectored() && self.inner().is_write_vectored() } /// Write some data into this BufReader with line buffering. This means @@ -1039,37 +1075,37 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// writer, it will also flush the existing buffer if it contains any /// newlines, even if the incoming data does not contain any newlines. fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - match memchr::memrchr(b'\n', buf) { + let newline_idx = match memchr::memrchr(b'\n', buf) { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write None => { // Check for prior partial line writes that need to be retried. // Only retry if the buffer contains a completed line, to // avoid flushing partial lines. - if let Some(b'\n') = self.inner.buffer().last().copied() { - self.inner.flush_buf()?; + if let Some(b'\n') = self.buffered().last().copied() { + self.buffer.flush_buf()?; } - self.inner.write_all(buf) + return self.buffer.write_all(buf); } // Otherwise, arrange for the lines to be written directly to the // inner writer. - Some(newline_idx) => { - // Flush existing content to prepare for our write - self.inner.flush_buf()?; + Some(newline_idx) => newline_idx, + }; - // This is what we're going to try to write directly to the inner - // writer. The rest will be buffered, if nothing goes wrong. - let (lines, tail) = buf.split_at(newline_idx + 1); + // Flush existing content to prepare for our write + self.buffer.flush_buf()?; - // Write `lines` directly to the inner writer, bypassing the buffer. - self.inner.get_mut().write_all(lines)?; + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let (lines, tail) = buf.split_at(newline_idx + 1); - // Now that the write has succeeded, buffer the rest with BufWriter::write_all. - // This will buffer as much as possible, but continue flushing as - // necessary if our tail is huge. - self.inner.write_all(tail) - } - } + // Write `lines` directly to the inner writer, bypassing the buffer. + self.inner_mut().write_all(lines)?; + + // Now that the write has succeeded, buffer the rest with + // BufWriter::write_all. This will buffer as much as possible, but + // continue flushing as necessary if our tail is huge. + self.buffer.write_all(tail) } } @@ -1310,7 +1346,7 @@ where #[cfg(test)] mod tests { use crate::io::prelude::*; - use crate::io::{self, BufReader, BufWriter, IoSlice, LineWriter, SeekFrom}; + use crate::io::{self, BufReader, BufWriter, ErrorKind, IoSlice, LineWriter, SeekFrom}; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::thread; @@ -1598,34 +1634,6 @@ mod tests { assert_eq!(v, []); } - #[test] - fn test_line_buffer_fail_flush() { - // Issue #32085 - struct FailFlushWriter<'a>(&'a mut Vec); - - impl Write for FailFlushWriter<'_> { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Err(io::Error::new(io::ErrorKind::Other, "flush failed")) - } - } - - let mut buf = Vec::new(); - { - let mut writer = LineWriter::new(FailFlushWriter(&mut buf)); - let to_write = b"abc\ndef"; - if let Ok(written) = writer.write(to_write) { - assert!(written < to_write.len(), "didn't flush on new line"); - // PASS - return; - } - } - assert!(buf.is_empty(), "write returned an error but wrote data"); - } - #[test] fn test_line_buffer() { let mut writer = LineWriter::new(Vec::new()); @@ -1746,44 +1754,62 @@ mod tests { b.iter(|| BufWriter::new(io::sink())); } + /// A simple `Write` target, designed to be wrapped by `LineWriter` / + /// `BufWriter` / etc, that can have its `write` & `flush` behavior + /// configured #[derive(Default, Clone)] struct ProgrammableSink { // Writes append to this slice - buffer: Vec, + pub buffer: Vec, - // Flushes set this flag - flushed: bool, + // Flush sets this flag + pub flushed: bool, // If true, writes & flushes will always be an error - return_error: bool, + pub always_error: bool, // If set, only up to this number of bytes will be written in a single // call to `write` - accept_prefix: Option, + pub accept_prefix: Option, // If set, counts down with each write, and writes return an error // when it hits 0 - max_writes: Option, + pub max_writes: Option, + + // If set, attempting to write when max_writes == Some(0) will be an + // error; otherwise, it will return Ok(0). + pub error_after_max_writes: bool, } impl Write for ProgrammableSink { fn write(&mut self, data: &[u8]) -> io::Result { - if self.return_error { - Err(io::Error::new(io::ErrorKind::Other, "test")) - } else { - let len = match self.accept_prefix { - None => data.len(), - Some(prefix) => prefix.min(prefix), - }; - let data = &data[..len]; - self.buffer.extend_from_slice(data); - Ok(len) + if self.always_error { + return Err(io::Error::new(io::ErrorKind::Other, "test - write always_error")); + } + + match self.max_writes { + Some(0) if self.error_after_max_writes => { + return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes")) + } + Some(0) => return Ok(0), + Some(ref mut count) => *count -= 1, + None => {} } + + let len = match self.accept_prefix { + None => data.len(), + Some(prefix) => data.len().min(prefix), + }; + + let data = &data[..len]; + self.buffer.extend_from_slice(data); + + Ok(len) } fn flush(&mut self) -> io::Result<()> { - if self.return_error { - Err(io::Error::new(io::ErrorKind::Other, "test")) + if self.always_error { + Err(io::Error::new(io::ErrorKind::Other, "test - flush always_error")) } else { self.flushed = true; Ok(()) @@ -1802,7 +1828,27 @@ mod tests { /// Regression test for #37807 #[test] fn erroneous_flush_retried() { - todo!() + let writer = ProgrammableSink { + // Only write up to 4 bytes at a time + accept_prefix: Some(4), + + // Accept the first two writes, then error the others + max_writes: Some(2), + error_after_max_writes: true, + + ..Default::default() + }; + + // This should write the first 4 bytes. The rest will be buffered, out + // to the last newline. + let mut writer = LineWriter::new(writer); + assert_eq!(writer.write(b"a\nb\nc\nd\ne").unwrap(), 8); + + // This write should attempt to flush "c\nd\n", then buffer "e". No + // errors should happen here because no further writes should be + // attempted against `writer`. + assert_eq!(writer.write(b"e").unwrap(), 1); + assert_eq!(&writer.get_ref().buffer, b"a\nb\nc\nd\n"); } #[test] @@ -1927,7 +1973,14 @@ mod tests { /// This behavior is desirable because it prevents flushing partial lines #[test] fn test_partial_write_buffers_line() { - todo!() + let writer = ProgrammableSink { accept_prefix: Some(13), ..Default::default() }; + let mut writer = LineWriter::new(writer); + + assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3\nLine4").unwrap(), 21); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2"); + + assert_eq!(writer.write(b"Line 4").unwrap(), 6); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); } /// Test that, given this input: @@ -1940,7 +1993,14 @@ mod tests { /// That data up to Line 3 is buffered #[test] fn test_partial_line_buffered_after_line_write() { - todo!() + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::new(writer); + + assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3").unwrap(), 20); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\n"); + + assert!(writer.flush().is_ok()); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3"); } /// Test that, given a partial line that exceeds the length of @@ -1948,14 +2008,74 @@ mod tests { /// line is written to the inner writer #[test] fn test_long_line_flushed() { - todo!() + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::with_capacity(5, writer); + + assert_eq!(writer.write(b"0123456789").unwrap(), 10); + assert_eq!(&writer.get_ref().buffer, b"0123456789"); } /// Test that, given a very long partial line *after* successfully /// flushing a complete line, that that line is buffered unconditionally, - /// and no additional writes take place + /// and no additional writes take place. This assures the property that + /// `write` should make at-most-one attempt to write new data. #[test] fn test_long_tail_not_flushed() { - todo!() + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::with_capacity(5, writer); + + // Assert that Line 1\n is flushed, and 01234 is buffered + assert_eq!(writer.write(b"Line 1\n0123456789").unwrap(), 12); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // Because the buffer is full, this subsequent write will flush it + assert_eq!(writer.write(b"5").unwrap(), 1); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n01234"); + } + + /// Test that, if an attempt to pre-flush buffered data returns Ok(0), + /// this is propagated as an error. + #[test] + fn test_line_buffer_write0_error() { + let writer = ProgrammableSink { + // Accept one write, then return Ok(0) on subsequent ones + max_writes: Some(1), + + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + // This should write "Line 1\n" and buffer "Partial" + assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // This will attempt to flush "partial", which will return Ok(0), which + // needs to be an error, because we've already informed the client + // that we accepted the write. + let err = writer.write(b" Line End\n").unwrap_err(); + assert_eq!(err.kind(), ErrorKind::WriteZero); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + } + + /// Test that, if a write returns Ok(0) after a successful pre-flush, this + /// is propogated as Ok(0) + #[test] + fn test_line_buffer_write0_normal() { + let writer = ProgrammableSink { + // Accept two writes, then return Ok(0) on subsequent ones + max_writes: Some(2), + + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + // This should write "Line 1\n" and buffer "Partial" + assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // This will flush partial, which will succeed, but then return Ok(0) + // when flushing " Line End\n" + assert_eq!(writer.write(b" Line End\n").unwrap(), 0); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nPartial"); } } From b6296e88f0176255e67672a70c9761729e20f33f Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 00:43:06 -0400 Subject: [PATCH 10/31] Tons of testing updates, other minor changes - Cleaned up BufWriter::seek - Updated line_vectored test - Updated line_vectored_partial_and_errors test - Added several new tests --- src/libstd/io/buffered.rs | 130 +++++++++++++++++++++++++++++++------- 1 file changed, 106 insertions(+), 24 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 730ae8a0ee471..ced84b777ce77 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -751,7 +751,8 @@ impl Seek for BufWriter { /// /// Seeking always writes out the internal buffer before seeking. fn seek(&mut self, pos: SeekFrom) -> io::Result { - self.flush_buf().and_then(|_| self.get_mut().seek(pos)) + self.flush_buf()?; + self.get_mut().seek(pos) } } @@ -1862,12 +1863,13 @@ mod tests { IoSlice::new(b"a"), ]) .unwrap(), - 2, + 1, ); assert_eq!(a.get_ref(), b"\n"); assert_eq!( a.write_vectored(&[ + IoSlice::new(b"a"), IoSlice::new(&[]), IoSlice::new(b"b"), IoSlice::new(&[]), @@ -1876,7 +1878,7 @@ mod tests { IoSlice::new(b"c"), ]) .unwrap(), - 3, + 4, ); assert_eq!(a.get_ref(), b"\n"); a.flush().unwrap(); @@ -1893,17 +1895,21 @@ mod tests { 0, ); assert_eq!(a.write_vectored(&[IoSlice::new(b"a\nb"),]).unwrap(), 3); - assert_eq!(a.get_ref(), b"\nabaca\n"); + assert_eq!(a.get_ref(), b"\nabaca\nb"); } #[test] fn line_vectored_partial_and_errors() { + use crate::collections::VecDeque; + enum Call { Write { inputs: Vec<&'static [u8]>, output: io::Result }, Flush { output: io::Result<()> }, } + + #[derive(Default)] struct Writer { - calls: Vec, + calls: VecDeque, } impl Write for Writer { @@ -1912,19 +1918,23 @@ mod tests { } fn write_vectored(&mut self, buf: &[IoSlice<'_>]) -> io::Result { - match self.calls.pop().unwrap() { + match self.calls.pop_front().expect("unexpected call to write") { Call::Write { inputs, output } => { assert_eq!(inputs, buf.iter().map(|b| &**b).collect::>()); output } - _ => panic!("unexpected call to write"), + Call::Flush { .. } => panic!("unexpected call to write; expected a flush"), } } + fn is_write_vectored(&self) -> bool { + true + } + fn flush(&mut self) -> io::Result<()> { - match self.calls.pop().unwrap() { + match self.calls.pop_front().expect("Unexpected call to flush") { Call::Flush { output } => output, - _ => panic!("unexpected call to flush"), + Call::Write { .. } => panic!("unexpected call to flush; expected a write"), } } } @@ -1938,20 +1948,22 @@ mod tests { } // partial writes keep going - let mut a = LineWriter::new(Writer { calls: Vec::new() }); + let mut a = LineWriter::new(Writer::default()); a.write_vectored(&[IoSlice::new(&[]), IoSlice::new(b"abc")]).unwrap(); - a.get_mut().calls.push(Call::Flush { output: Ok(()) }); - a.get_mut().calls.push(Call::Write { inputs: vec![b"bcx\n"], output: Ok(4) }); - a.get_mut().calls.push(Call::Write { inputs: vec![b"abcx\n"], output: Ok(1) }); + + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"abc"], output: Ok(1) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"bc"], output: Ok(2) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\n"], output: Ok(2) }); + a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\n")]).unwrap(); - a.get_mut().calls.push(Call::Flush { output: Ok(()) }); + + a.get_mut().calls.push_back(Call::Flush { output: Ok(()) }); a.flush().unwrap(); // erroneous writes stop and don't write more - a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Err(err()) }); - assert_eq!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).unwrap(), 2); - a.get_mut().calls.push(Call::Flush { output: Ok(()) }); - a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Ok(2) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\na"], output: Err(err()) }); + a.get_mut().calls.push_back(Call::Flush { output: Ok(()) }); + assert!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).is_err()); a.flush().unwrap(); fn err() -> io::Error { @@ -1959,6 +1971,41 @@ mod tests { } } + /// Test that, in cases where vectored writing is not enabled, the + /// LineWriter uses the normal `write` call, which more-corectly handles + /// partial lines + #[test] + fn line_vectored_ignored() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::new(writer); + + let content = [ + IoSlice::new(b"Line 1\nLine"), + IoSlice::new(b" 2\nLine 3\nL"), + IoSlice::new(b"ine 4"), + IoSlice::new(b"\nLine 5\n"), + ]; + + let count = writer.write_vectored(&content).unwrap(); + assert_eq!(count, 11); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + let count = writer.write_vectored(&content[1..]).unwrap(); + assert_eq!(count, 11); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); + + let count = writer.write_vectored(&content[2..]).unwrap(); + assert_eq!(count, 5); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); + + let count = writer.write_vectored(&content[3..]).unwrap(); + assert_eq!(count, 8); + assert_eq!( + writer.get_ref().buffer.as_slice(), + b"Line 1\nLine 2\nLine 3\nLine 4\n Line 5".as_ref() + ); + } + /// Test that, given this input: /// /// Line 1\n @@ -1972,7 +2019,7 @@ mod tests { /// /// This behavior is desirable because it prevents flushing partial lines #[test] - fn test_partial_write_buffers_line() { + fn partial_write_buffers_line() { let writer = ProgrammableSink { accept_prefix: Some(13), ..Default::default() }; let mut writer = LineWriter::new(writer); @@ -1992,7 +2039,7 @@ mod tests { /// And given that the full write of lines 1 and 2 was successful /// That data up to Line 3 is buffered #[test] - fn test_partial_line_buffered_after_line_write() { + fn partial_line_buffered_after_line_write() { let writer = ProgrammableSink::default(); let mut writer = LineWriter::new(writer); @@ -2007,7 +2054,7 @@ mod tests { /// LineBuffer's buffer (that is, without a trailing newline), that that /// line is written to the inner writer #[test] - fn test_long_line_flushed() { + fn long_line_flushed() { let writer = ProgrammableSink::default(); let mut writer = LineWriter::with_capacity(5, writer); @@ -2020,7 +2067,7 @@ mod tests { /// and no additional writes take place. This assures the property that /// `write` should make at-most-one attempt to write new data. #[test] - fn test_long_tail_not_flushed() { + fn line_long_tail_not_flushed() { let writer = ProgrammableSink::default(); let mut writer = LineWriter::with_capacity(5, writer); @@ -2036,7 +2083,7 @@ mod tests { /// Test that, if an attempt to pre-flush buffered data returns Ok(0), /// this is propagated as an error. #[test] - fn test_line_buffer_write0_error() { + fn line_buffer_write0_error() { let writer = ProgrammableSink { // Accept one write, then return Ok(0) on subsequent ones max_writes: Some(1), @@ -2060,7 +2107,7 @@ mod tests { /// Test that, if a write returns Ok(0) after a successful pre-flush, this /// is propogated as Ok(0) #[test] - fn test_line_buffer_write0_normal() { + fn line_buffer_write0_normal() { let writer = ProgrammableSink { // Accept two writes, then return Ok(0) on subsequent ones max_writes: Some(2), @@ -2078,4 +2125,39 @@ mod tests { assert_eq!(writer.write(b" Line End\n").unwrap(), 0); assert_eq!(&writer.get_ref().buffer, b"Line 1\nPartial"); } + + /// LineWriter has a custom `write_all`; make sure it works correctly + #[test] + fn line_write_all() { + let writer = ProgrammableSink { + // Only write 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial").unwrap(); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\nLine 4\n"); + writer.write_all(b" Line 5\n").unwrap(); + assert_eq!( + writer.get_ref().buffer.as_slice(), + b"Line 1\nLine 2\nLine 3\nLine 4\nPartial Line 5\n".as_ref(), + ); + } + + #[test] + fn line_write_all_error() { + let writer = ProgrammableSink { + // Only accept up to 3 writes of up to 5 bytes each + accept_prefix: Some(5), + max_writes: Some(3), + ..Default::default() + }; + + let mut writer = LineWriter::new(writer); + let res = writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial"); + assert!(res.is_err()); + // An error from write_all leaves everything in an indeterminate state, + // so there's nothing else to test here + } } From e022d3452de74b5596810f0aa26193e262de725b Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 00:44:16 -0400 Subject: [PATCH 11/31] Fixed typo in test --- src/libstd/io/buffered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index ced84b777ce77..abfc2fc37c4f9 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -2002,7 +2002,7 @@ mod tests { assert_eq!(count, 8); assert_eq!( writer.get_ref().buffer.as_slice(), - b"Line 1\nLine 2\nLine 3\nLine 4\n Line 5".as_ref() + b"Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n".as_ref() ); } From e4328ae54573aa42051246b675bf5280ea58eddf Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 01:01:30 -0400 Subject: [PATCH 12/31] Code review updates: all minor style fixes - Renamed write_to_buffer to write_to_buf, for consistency - Fixed references to flush_buf - Optimized `write` to use one less `memchr` call --- src/libstd/io/buffered.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index abfc2fc37c4f9..7b7c8c6e3177a 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -565,7 +565,7 @@ impl BufWriter { /// data. Writes as much as possible without exceeding capacity. Returns /// the number of bytes written. #[inline] - fn write_to_buffer(&mut self, buf: &[u8]) -> usize { + fn write_to_buf(&mut self, buf: &[u8]) -> usize { let available = self.buf.capacity() - self.buf.len(); let amt_to_buffer = available.min(buf.len()); self.buf.extend_from_slice(&buf[..amt_to_buffer]); @@ -689,7 +689,7 @@ impl Write for BufWriter { self.panicked = false; r } else { - Ok(self.write_to_buffer(buf)) + Ok(self.write_to_buf(buf)) } } @@ -703,7 +703,7 @@ impl Write for BufWriter { self.panicked = false; r } else { - self.write_to_buffer(buf); + self.write_to_buf(buf); Ok(()) } } @@ -861,7 +861,7 @@ impl fmt::Display for IntoInnerError { /// Private helper struct for implementing the line-buffered writing logic. /// This shim temporarily wraps a BufWriter, and uses its internals to /// implement a line-buffered writer (specifically by using the internal -/// methods like write_to_buffer and flush_buffer). In this way, a more +/// methods like write_to_buf and flush_buf). In this way, a more /// efficient abstraction can be created than one that only had access to /// `write` and `flush`, without needlessly duplicating a lot of the /// implementation details of BufWriter. This also allows existing @@ -925,7 +925,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { } // Otherwise, arrange for the lines to be written directly to the // inner writer. - Some(newline_idx) => newline_idx, + Some(newline_idx) => newline_idx + 1, }; // Flush existing content to prepare for our write @@ -933,7 +933,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // This is what we're going to try to write directly to the inner // writer. The rest will be buffered, if nothing goes wrong. - let lines = &buf[..newline_idx + 1]; + let lines = &buf[..newline_idx]; // Write `lines` directly to the inner writer. In keeping with the // `write` convention, make at most one attempt to add new (unbuffered) @@ -953,11 +953,10 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // the rest as possible). If there were any unwritten newlines, we // only buffer out to the last unwritten newline; this helps prevent // flushing partial lines on subsequent calls to LineWriterShim::write. - let tail = &buf[flushed..]; - let buffered = match memchr::memrchr(b'\n', tail) { - None => self.buffer.write_to_buffer(tail), - Some(i) => self.buffer.write_to_buffer(&tail[..i + 1]), - }; + let tail = + if flushed < newline_idx { &buf[flushed..newline_idx] } else { &buf[newline_idx..] }; + + let buffered = self.buffer.write_to_buf(tail); Ok(flushed + buffered) } @@ -1056,7 +1055,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // Now that the write has succeeded, buffer the rest (or as much of the // rest as possible) let buffered: usize = - tail.iter().map(|buf| self.buffer.write_to_buffer(buf)).take_while(|&n| n > 0).sum(); + tail.iter().map(|buf| self.buffer.write_to_buf(buf)).take_while(|&n| n > 0).sum(); Ok(flushed + buffered) } @@ -1076,6 +1075,9 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// writer, it will also flush the existing buffer if it contains any /// newlines, even if the incoming data does not contain any newlines. fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + // The default `write_all` calls `write` in a loop; we can do better + // by simply calling self.inner().write_all directly. This avoids + // round trips to `self.buffer` in the event of partial writes. let newline_idx = match memchr::memrchr(b'\n', buf) { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write From f7650fe3f955863e810ef7816edbe78f88393d87 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 01:07:34 -0400 Subject: [PATCH 13/31] Add comment --- src/libstd/io/buffered.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 7b7c8c6e3177a..b9839c318b75a 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -1358,6 +1358,9 @@ mod tests { lengths: Vec, } + // FIXME: rustfmt and tidy disagree about the correct formatting of this + // function. This leads to issues for users with editors configured to + // rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { if self.lengths.is_empty() { From 7a6a12bdf42804e2e2ef6437bed6a45a1ede51b2 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 01:09:55 -0400 Subject: [PATCH 14/31] Tidy fixes --- src/libstd/io/buffered.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index b9839c318b75a..560c8f2369846 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -1363,11 +1363,7 @@ mod tests { // rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { - Ok(0) - } else { - Ok(self.lengths.remove(0)) - } + if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } } } @@ -1795,7 +1791,7 @@ mod tests { match self.max_writes { Some(0) if self.error_after_max_writes => { - return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes")) + return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes")); } Some(0) => return Ok(0), Some(ref mut count) => *count -= 1, From 338a2c02e4b9cc5b1fae698ab019724340b26967 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 01:28:31 -0400 Subject: [PATCH 15/31] Reimplement flush_buf with a Guard. Longer, but cleaner. --- src/libstd/io/buffered.rs | 67 +++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 560c8f2369846..b7e848b04d721 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -448,6 +448,9 @@ impl Seek for BufReader { /// [`flush`]: #method.flush #[stable(feature = "rust1", since = "1.0.0")] pub struct BufWriter { + // FIXME: Can this just be W, instead of Option? I don't see any code + // paths that lead to this being None, or that ever check if it IS none, + // even in drop implementations. inner: Option, // FIXME: Replace this with a VecDeque. Because VecDeque is a Ring buffer, // this would enable BufWriter to operate without any interior copies. @@ -533,32 +536,62 @@ impl BufWriter { /// `write`), any 0-length writes from `inner` must be reported as i/o /// errors from this method. fn flush_buf(&mut self) -> io::Result<()> { - let mut written = 0; - let len = self.buf.len(); - let mut ret = Ok(()); - while written < len { + /// Helper struct to ensure the buffer is updated after all the writes + /// are complete + struct BufGuard<'a> { + buffer: &'a mut Vec, + written: usize, + } + + impl<'a> BufGuard<'a> { + fn new(buffer: &'a mut Vec) -> Self { + Self { buffer, written: 0 } + } + + /// The unwritten part of the buffer + fn remaining(&self) -> &[u8] { + &self.buffer[self.written..] + } + + /// Flag some bytes as removed from the front of the buffer + fn consume(&mut self, amt: usize) { + self.written += amt; + } + + /// true if all of the bytes have been written + fn done(&self) -> bool { + self.written >= self.buffer.len() + } + } + + impl Drop for BufGuard<'_> { + fn drop(&mut self) { + if self.written > 0 { + self.buffer.drain(..self.written); + } + } + } + + let mut guard = BufGuard::new(&mut self.buf); + let inner = self.inner.as_mut().unwrap(); + while !guard.done() { self.panicked = true; - let r = self.inner.as_mut().unwrap().write(&self.buf[written..]); + let r = inner.write(guard.remaining()); self.panicked = false; match r { Ok(0) => { - ret = - Err(Error::new(ErrorKind::WriteZero, "failed to write the buffered data")); - break; + return Err(Error::new( + ErrorKind::WriteZero, + "failed to write the buffered data", + )) } - Ok(n) => written += n, + Ok(n) => guard.consume(n), Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => { - ret = Err(e); - break; - } + Err(e) => return Err(e), } } - if written > 0 { - self.buf.drain(..written); - } - ret + Ok(()) } /// Buffer some data without flushing it, regardless of the size of the From c869638cba8e9ab8392016b11675706c4e514b6e Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 01:30:40 -0400 Subject: [PATCH 16/31] Added comment about BufWrite::write_all --- src/libstd/io/buffered.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index b7e848b04d721..f9bde7897e997 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -727,6 +727,10 @@ impl Write for BufWriter { } fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + // Normally, `write_all` just calls `write` in a loop. We can do better + // by calling `self.get_mut().write_all()` directly, which avoids + // round trips through the buffer in the event of a series of partial + // writes. if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } From 61f591e1738845911048ff43d956bb9d39987910 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 01:36:40 -0400 Subject: [PATCH 17/31] Improved line_vectored_ignored. Added stylistic semicolon. --- src/libstd/io/buffered.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index f9bde7897e997..02615bffe3bd1 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -584,7 +584,7 @@ impl BufWriter { return Err(Error::new( ErrorKind::WriteZero, "failed to write the buffered data", - )) + )); } Ok(n) => guard.consume(n), Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} @@ -2018,8 +2018,11 @@ mod tests { let mut writer = LineWriter::new(writer); let content = [ + IoSlice::new(&[]), IoSlice::new(b"Line 1\nLine"), IoSlice::new(b" 2\nLine 3\nL"), + IoSlice::new(&[]), + IoSlice::new(&[]), IoSlice::new(b"ine 4"), IoSlice::new(b"\nLine 5\n"), ]; @@ -2028,15 +2031,15 @@ mod tests { assert_eq!(count, 11); assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); - let count = writer.write_vectored(&content[1..]).unwrap(); + let count = writer.write_vectored(&content[2..]).unwrap(); assert_eq!(count, 11); assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); - let count = writer.write_vectored(&content[2..]).unwrap(); + let count = writer.write_vectored(&content[5..]).unwrap(); assert_eq!(count, 5); assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); - let count = writer.write_vectored(&content[3..]).unwrap(); + let count = writer.write_vectored(&content[6..]).unwrap(); assert_eq!(count, 8); assert_eq!( writer.get_ref().buffer.as_slice(), From 2d22c7741816aa391f619f31b14b1ca01cc31b61 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 01:43:10 -0400 Subject: [PATCH 18/31] Fixed bug in write_vectored & empty buffers --- src/libstd/io/buffered.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 02615bffe3bd1..5189b39aaae68 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -1091,8 +1091,12 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // Now that the write has succeeded, buffer the rest (or as much of the // rest as possible) - let buffered: usize = - tail.iter().map(|buf| self.buffer.write_to_buf(buf)).take_while(|&n| n > 0).sum(); + let buffered: usize = tail + .iter() + .filter(|buf| !buf.is_empty()) + .map(|buf| self.buffer.write_to_buf(buf)) + .take_while(|&n| n > 0) + .sum(); Ok(flushed + buffered) } From 2c23b9066f761e07f1c4db6ca8d8f0bf48e4e296 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 11:56:02 -0400 Subject: [PATCH 19/31] Comment updates --- src/libstd/io/buffered.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 5189b39aaae68..51098f6ff6af5 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -730,7 +730,7 @@ impl Write for BufWriter { // Normally, `write_all` just calls `write` in a loop. We can do better // by calling `self.get_mut().write_all()` directly, which avoids // round trips through the buffer in the event of a series of partial - // writes. + // writes in some circumstances. if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } @@ -1116,9 +1116,6 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// writer, it will also flush the existing buffer if it contains any /// newlines, even if the incoming data does not contain any newlines. fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - // The default `write_all` calls `write` in a loop; we can do better - // by simply calling self.inner().write_all directly. This avoids - // round trips to `self.buffer` in the event of partial writes. let newline_idx = match memchr::memrchr(b'\n', buf) { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write From e999ca5ac3a21183fdc8cc99350752d0f1ef0bdd Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 12:00:12 -0400 Subject: [PATCH 20/31] Remove inline from write_to_buf --- src/libstd/io/buffered.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 51098f6ff6af5..a66fa6ea36a28 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -597,7 +597,6 @@ impl BufWriter { /// Buffer some data without flushing it, regardless of the size of the /// data. Writes as much as possible without exceeding capacity. Returns /// the number of bytes written. - #[inline] fn write_to_buf(&mut self, buf: &[u8]) -> usize { let available = self.buf.capacity() - self.buf.len(); let amt_to_buffer = available.min(buf.len()); From 70ba320052669c06e2285d16b4a36af83f9c83da Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 12:40:05 -0400 Subject: [PATCH 21/31] More minor changes - Fixed test after write_vectored bugfix - Some comments --- src/libstd/io/buffered.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index a66fa6ea36a28..c599bd8b55127 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -715,6 +715,7 @@ impl Write for BufWriter { if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } + // FIXME: Why no len > capacity? Why not buffer len == capacity? if buf.len() >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write(buf); @@ -733,6 +734,7 @@ impl Write for BufWriter { if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } + // FIXME: Why no len > capacity? Why not buffer len == capacity? if buf.len() >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write_all(buf); @@ -749,6 +751,7 @@ impl Write for BufWriter { if self.buf.len() + total_len > self.buf.capacity() { self.flush_buf()?; } + // FIXME: Why no len > capacity? Why not buffer len == capacity? if total_len >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write_vectored(bufs); @@ -1901,13 +1904,12 @@ mod tests { IoSlice::new(b"a"), ]) .unwrap(), - 1, + 2, ); assert_eq!(a.get_ref(), b"\n"); assert_eq!( a.write_vectored(&[ - IoSlice::new(b"a"), IoSlice::new(&[]), IoSlice::new(b"b"), IoSlice::new(&[]), @@ -1916,7 +1918,7 @@ mod tests { IoSlice::new(b"c"), ]) .unwrap(), - 4, + 3, ); assert_eq!(a.get_ref(), b"\n"); a.flush().unwrap(); From 5b1a40c18168a8049eb9f3af3178ec1846e64730 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 12:58:49 -0400 Subject: [PATCH 22/31] BufWriter::write* methods now use fewer runtime checks --- src/libstd/io/buffered.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index c599bd8b55127..1998cdde8d08e 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -722,7 +722,8 @@ impl Write for BufWriter { self.panicked = false; r } else { - Ok(self.write_to_buf(buf)) + self.buf.extend_from_slice(buf); + Ok(buf.len()) } } @@ -741,7 +742,7 @@ impl Write for BufWriter { self.panicked = false; r } else { - self.write_to_buf(buf); + self.buf.extend_from_slice(buf); Ok(()) } } @@ -758,7 +759,8 @@ impl Write for BufWriter { self.panicked = false; r } else { - self.buf.write_vectored(bufs) + bufs.iter().for_each(|b| self.buf.extend_from_slice(b)); + Ok(total_len) } } @@ -1403,7 +1405,11 @@ mod tests { // rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } } } From 8df5ae0fffc9de2884e0c916bdcd74cb69949b1c Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 14:36:03 -0400 Subject: [PATCH 23/31] x.py fix AGAIN --- src/libstd/io/buffered.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 1998cdde8d08e..61ad5d0c274ff 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -1405,11 +1405,7 @@ mod tests { // rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { - Ok(0) - } else { - Ok(self.lengths.remove(0)) - } + if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } } } From 60ab99f9bdcb33ea025aca6a94e34f2ae5b5b75e Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 15:57:20 -0400 Subject: [PATCH 24/31] Fixed corner case related to partial-line buffering - Fixed partial-line buffering issue - Added tests Thanks @the8472 for catching! --- src/libstd/io/buffered.rs | 93 +++++++++++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 8 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 61ad5d0c274ff..d094f19b5a5ff 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -994,8 +994,26 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // the rest as possible). If there were any unwritten newlines, we // only buffer out to the last unwritten newline; this helps prevent // flushing partial lines on subsequent calls to LineWriterShim::write. - let tail = - if flushed < newline_idx { &buf[flushed..newline_idx] } else { &buf[newline_idx..] }; + + // Handle the cases in order of most-common to least-common, under + // the presumption that most writes succeed in totality, and that most + // writes are smaller than the buffer. + // - Is this a partial line (ie, no newlines left in the unwritten tail) + // - If not, does the data out to the last unwritten newline fit in + // the buffer? + // - If not, scan for the last newline that *does* fit in the buffer + let tail = if flushed >= newline_idx { + &buf[flushed..] + } else if newline_idx - flushed <= self.buffer.capacity() { + &buf[flushed..newline_idx] + } else { + let scan_area = &buf[flushed..]; + let scan_area = &scan_area[..self.buffer.capacity()]; + match memchr::memrchr(b'\n', scan_area) { + Some(newline_idx) => &scan_area[..newline_idx + 1], + None => scan_area, + } + }; let buffered = self.buffer.write_to_buf(tail); Ok(flushed + buffered) @@ -1809,8 +1827,11 @@ mod tests { // Flush sets this flag pub flushed: bool, - // If true, writes & flushes will always be an error - pub always_error: bool, + // If true, writes will always be an error + pub always_write_error: bool, + + // If true, flushes will always be an error + pub always_flush_error: bool, // If set, only up to this number of bytes will be written in a single // call to `write` @@ -1827,8 +1848,8 @@ mod tests { impl Write for ProgrammableSink { fn write(&mut self, data: &[u8]) -> io::Result { - if self.always_error { - return Err(io::Error::new(io::ErrorKind::Other, "test - write always_error")); + if self.always_write_error { + return Err(io::Error::new(io::ErrorKind::Other, "test - always_write_error")); } match self.max_writes { @@ -1852,8 +1873,8 @@ mod tests { } fn flush(&mut self) -> io::Result<()> { - if self.always_error { - Err(io::Error::new(io::ErrorKind::Other, "test - flush always_error")) + if self.always_flush_error { + Err(io::Error::new(io::ErrorKind::Other, "test - always_flush_error")) } else { self.flushed = true; Ok(()) @@ -2205,4 +2226,60 @@ mod tests { // An error from write_all leaves everything in an indeterminate state, // so there's nothing else to test here } + + /// Under certain circumstances, the old implementation of LineWriter + /// would try to buffer "to the last newline" but be forced to buffer + /// less than that, leading to inappropriate partial line writes. + /// Regression test for that issue. + #[test] + fn partial_multiline_buffering() { + let writer = ProgrammableSink { + // Write only up to 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + + let mut writer = LineWriter::with_capacity(10, writer); + + let content = b"AAAAABBBBB\nCCCCDDDDDD\nEEE"; + + // When content is written, LineWriter will try to write blocks A, B, + // C, and D. Only block A will succeed. Under the old behavior, LineWriter + // would then try to buffer B, C and D, but because its capacity is 10, + // it will only be able to buffer B and C. We don't want it to buffer + // partial lines if it can avoid it, so the correct behavior is to + // only buffer block B (with its newline). + assert_eq!(writer.write(content).unwrap(), 11); + assert_eq!(writer.get_ref().buffer, *b"AAAAA"); + + writer.flush().unwrap(); + assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB\n"); + } + + /// Same as test_partial_multiline_buffering, but in the event NO full lines + /// fit in the buffer, just buffer as much as possible + #[test] + fn partial_multiline_buffering_without_full_line() { + let writer = ProgrammableSink { + // Write only up to 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + + let mut writer = LineWriter::with_capacity(5, writer); + + let content = b"AAAAABBBBBBBBBB\nCCCCC\nDDDDD"; + + // When content is written, LineWriter will try to write blocks A, B, + // and C. Only block A will succeed. Under the old behavior, LineWriter + // would then try to buffer B and C, but because its capacity is 5, + // it will only be able to buffer part of B. Because it's not possible + // for it to buffer any complete lines, it should buffer as much of B as + // possible + assert_eq!(writer.write(content).unwrap(), 10); + assert_eq!(writer.get_ref().buffer, *b"AAAAA"); + + writer.flush().unwrap(); + assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB"); + } } From 38017a31e7728c359487817008c913a8e461a857 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 16:17:42 -0400 Subject: [PATCH 25/31] Update comments with relevant issue numbers --- src/libstd/io/buffered.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index d094f19b5a5ff..1048c9d078d86 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -450,7 +450,7 @@ impl Seek for BufReader { pub struct BufWriter { // FIXME: Can this just be W, instead of Option? I don't see any code // paths that lead to this being None, or that ever check if it IS none, - // even in drop implementations. + // even in drop implementations. #72925. inner: Option, // FIXME: Replace this with a VecDeque. Because VecDeque is a Ring buffer, // this would enable BufWriter to operate without any interior copies. @@ -715,7 +715,7 @@ impl Write for BufWriter { if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } - // FIXME: Why no len > capacity? Why not buffer len == capacity? + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 if buf.len() >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write(buf); @@ -735,7 +735,7 @@ impl Write for BufWriter { if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } - // FIXME: Why no len > capacity? Why not buffer len == capacity? + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 if buf.len() >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write_all(buf); @@ -752,7 +752,7 @@ impl Write for BufWriter { if self.buf.len() + total_len > self.buf.capacity() { self.flush_buf()?; } - // FIXME: Why no len > capacity? Why not buffer len == capacity? + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 if total_len >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write_vectored(bufs); From 59710fb716d2810c6eada13275039274b6279065 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 2 Jun 2020 18:16:02 -0400 Subject: [PATCH 26/31] Clarified comment in `partial_multiline_buffering` test --- src/libstd/io/buffered.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 1048c9d078d86..c8c9ee4a4b47c 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -2246,9 +2246,9 @@ mod tests { // When content is written, LineWriter will try to write blocks A, B, // C, and D. Only block A will succeed. Under the old behavior, LineWriter // would then try to buffer B, C and D, but because its capacity is 10, - // it will only be able to buffer B and C. We don't want it to buffer - // partial lines if it can avoid it, so the correct behavior is to - // only buffer block B (with its newline). + // it will only be able to buffer B and C. We don't want to buffer + // partial lines concurrent with whole lines, so the correct behavior + // is to buffer only block B (out to the newline) assert_eq!(writer.write(content).unwrap(), 11); assert_eq!(writer.get_ref().buffer, *b"AAAAA"); From b8632e15483420784e9f4b95c882d24839dbada9 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 11 Jul 2020 03:37:14 -0400 Subject: [PATCH 27/31] Removed FIXME --- src/libstd/io/buffered.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index c8c9ee4a4b47c..03a9fb91e5d96 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -448,9 +448,6 @@ impl Seek for BufReader { /// [`flush`]: #method.flush #[stable(feature = "rust1", since = "1.0.0")] pub struct BufWriter { - // FIXME: Can this just be W, instead of Option? I don't see any code - // paths that lead to this being None, or that ever check if it IS none, - // even in drop implementations. #72925. inner: Option, // FIXME: Replace this with a VecDeque. Because VecDeque is a Ring buffer, // this would enable BufWriter to operate without any interior copies. From 140bfc58aa9d193aeca1cce905a00195f8f23f3b Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 11 Jul 2020 03:41:06 -0400 Subject: [PATCH 28/31] Removed another FIXME --- src/libstd/io/buffered.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 03a9fb91e5d96..2f2a67b0be96d 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -449,12 +449,6 @@ impl Seek for BufReader { #[stable(feature = "rust1", since = "1.0.0")] pub struct BufWriter { inner: Option, - // FIXME: Replace this with a VecDeque. Because VecDeque is a Ring buffer, - // this would enable BufWriter to operate without any interior copies. - // It was also allow a much simpler implementation of flush_buf. The main - // blocker here is that VecDeque doesn't currently have the same - // slice-specific specializations (extend_from_slice, `Extend` - // specializations) buf: Vec, // #30888: If the inner writer panics in a call to write, we don't want to // write the buffered data a second time in BufWriter's destructor. This From 997accc214f3a915541b06f3126568d367161882 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 11 Jul 2020 14:45:22 -0400 Subject: [PATCH 29/31] Remove doubled "is_write_vectored" --- src/libstd/io/buffered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 2f2a67b0be96d..597bad0c2eeba 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -1117,7 +1117,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { fn is_write_vectored(&self) -> bool { // It's hard to imagine these diverging, but it's worth checking // just in case, because we call `write_vectored` on both. - self.buffer.is_write_vectored() && self.inner().is_write_vectored() + self.buffer.is_write_vectored() } /// Write some data into this BufReader with line buffering. This means From 6a7b5df55ffddd5a152ebbb865d490e52e93bec7 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 11 Jul 2020 15:33:45 -0400 Subject: [PATCH 30/31] Removed unused method --- src/libstd/io/buffered.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index 597bad0c2eeba..cedc993b46147 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -909,12 +909,6 @@ impl<'a, W: Write> LineWriterShim<'a, W> { Self { buffer } } - /// Get a reference to the inner writer (that is, the writer wrapped by - /// the BufWriter) - fn inner(&self) -> &W { - self.buffer.get_ref() - } - /// Get a mutable reference to the inner writer (that is, the writer /// wrapped by the BufWriter). Be careful with this writer, as writes to /// it will bypass the buffer. From 606593fecec5510a32b6aa3b0bc2bd5cf81f28e2 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 12 Jul 2020 01:00:22 -0400 Subject: [PATCH 31/31] Minor updates - Remove outdated comment - Refactor flush-retry behavior into its own method - Some other comment updates --- src/libstd/io/buffered.rs | 40 +++++++++++++++++---------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/src/libstd/io/buffered.rs b/src/libstd/io/buffered.rs index cedc993b46147..7d9c33582bca1 100644 --- a/src/libstd/io/buffered.rs +++ b/src/libstd/io/buffered.rs @@ -920,6 +920,16 @@ impl<'a, W: Write> LineWriterShim<'a, W> { fn buffered(&self) -> &[u8] { self.buffer.buffer() } + + /// Flush the buffer iff the last byte is a newline (indicating that an + /// earlier write only succeeded partially, and we want to retry flushing + /// the buffered line before continuing with a subsequent write) + fn flush_if_completed_line(&mut self) -> io::Result<()> { + match self.buffered().last().copied() { + Some(b'\n') => self.buffer.flush_buf(), + _ => Ok(()), + } + } } impl<'a, W: Write> Write for LineWriterShim<'a, W> { @@ -941,12 +951,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write None => { - // Check for prior partial line writes that need to be retried. - // Only retry if the buffer contains a completed line, to - // avoid flushing partial lines. - if let Some(b'\n') = self.buffered().last().copied() { - self.buffer.flush_buf()?; - } + self.flush_if_completed_line()?; return self.buffer.write(buf); } // Otherwise, arrange for the lines to be written directly to the @@ -1025,9 +1030,10 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// Because sorting through an array of `IoSlice` can be a bit convoluted, /// This method differs from write in the following ways: /// - /// - It attempts to write all the buffers up to and including the one - /// containing the last newline. This means that it may attempt to - /// write a partial line. + /// - It attempts to write the full content of all the buffers up to and + /// including the one containing the last newline. This means that it + /// may attempt to write a partial line, that buffer has data past the + /// newline. /// - If the write only reports partial success, it does not attempt to /// find the precise location of the written bytes and buffer the rest. /// @@ -1057,12 +1063,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { let last_newline_buf_idx = match last_newline_buf_idx { // No newlines; just do a normal buffered write None => { - // Check for prior partial line writes that need to be retried. - // Only retry if the buffer contains a completed line, to - // avoid flushing partial lines. - if let Some(b'\n') = self.buffered().last().copied() { - self.buffer.flush_buf()?; - } + self.flush_if_completed_line()?; return self.buffer.write_vectored(bufs); } Some(i) => i, @@ -1109,8 +1110,6 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { } fn is_write_vectored(&self) -> bool { - // It's hard to imagine these diverging, but it's worth checking - // just in case, because we call `write_vectored` on both. self.buffer.is_write_vectored() } @@ -1127,12 +1126,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write None => { - // Check for prior partial line writes that need to be retried. - // Only retry if the buffer contains a completed line, to - // avoid flushing partial lines. - if let Some(b'\n') = self.buffered().last().copied() { - self.buffer.flush_buf()?; - } + self.flush_if_completed_line()?; return self.buffer.write_all(buf); } // Otherwise, arrange for the lines to be written directly to the