Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let io::copy reuse BufWriter buffers #78641

Merged
merged 1 commit into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion library/std/src/io/buffered/bufwriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<W: Write> BufWriter<W> {
/// "successfully written" (by returning nonzero success values from
/// `write`), any 0-length writes from `inner` must be reported as i/o
/// errors from this method.
pub(super) fn flush_buf(&mut self) -> io::Result<()> {
pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> {
/// Helper struct to ensure the buffer is updated after all the writes
/// are complete. It tracks the number of written bytes and drains them
/// all from the front of the buffer when dropped.
Expand Down Expand Up @@ -243,6 +243,18 @@ impl<W: Write> BufWriter<W> {
&self.buf
}

/// Returns a mutable reference to the internal buffer.
///
/// This can be used to write data directly into the buffer without triggering writers
/// to the underlying writer.
///
/// That the buffer is a `Vec` is an implementation detail.
/// Callers should not modify the capacity as there currently is no public API to do so
/// and thus any capacity changes would be unexpected by the user.
pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buf
}

/// Returns the number of bytes the internal buffer can hold without flushing.
///
/// # Examples
Expand Down
80 changes: 74 additions & 6 deletions library/std/src/io/copy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::io::{self, ErrorKind, Read, Write};
use super::{BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::mem::MaybeUninit;

/// Copies the entire contents of a reader into a writer.
Expand Down Expand Up @@ -40,7 +40,7 @@ use crate::mem::MaybeUninit;
/// }
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> Result<u64>
where
R: Read,
W: Write,
Expand All @@ -54,14 +54,82 @@ where
}
}

/// The general read-write-loop implementation of
/// `io::copy` that is used when specializations are not available or not applicable.
pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
/// The userspace read-write-loop implementation of `io::copy` that is used when
/// OS-specific specializations for copy offloading are not available or not applicable.
pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> Result<u64>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange for there to be specialization in the function used "when specializations are not available or not applicable".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That refers to another set of specializations. I'll try to reword it.

where
R: Read,
W: Write,
{
let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit();
BufferedCopySpec::copy_to(reader, writer)
}

/// Specialization of the read-write loop that either uses a stack buffer
/// or reuses the internal buffer of a BufWriter
trait BufferedCopySpec: Write {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64>;
}

impl<W: Write + ?Sized> BufferedCopySpec for W {
default fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
stack_buffer_copy(reader, writer)
}
}

impl<I: Write> BufferedCopySpec for BufWriter<I> {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
if writer.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, writer);
}

// FIXME: #42788
//
// - This creates a (mut) reference to a slice of
// _uninitialized_ integers, which is **undefined behavior**
//
// - Only the standard library gets to soundly "ignore" this,
// based on its privileged knowledge of unstable rustc
// internals;
unsafe {
let spare_cap = writer.buffer_mut().spare_capacity_mut();
reader.initializer().initialize(MaybeUninit::slice_assume_init_mut(spare_cap));
}

let mut len = 0;

loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this loop doesn't update the BufWriter's internal tracking of how much data is buffered, so use after copy returns will double write bits.

Rather than getting access to all of the raw bits of the writer's state, it seems like it would be cleaner to have APIs to get access to the unfilled part of the buffer, and to be able to write the buffer through (but not fully flush the inner writer) on demand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as_raw_parts sets the len to 0 ahead of time and it's the caller's responsibility to write out the data. The doc comment on as_raw_parts says as much. If that is unclear I can try to improve the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the reader returns an error there will be dangling data in the output buffer, and this approach requires flushing the buffer out to the inner writer regardless of how much data is in it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more specific, I'm imagining roughly the inverse of BufRead, where there is a function to return the unfilled portion of the buffer, flushing if it happens to already be full to make space, and then a function to say that more data has been written into the buffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a function that borrows the vec instead, which can be used for both.

let buf = writer.buffer_mut();
let spare_cap = buf.spare_capacity_mut();

if spare_cap.len() >= DEFAULT_BUF_SIZE {
match reader.read(unsafe { MaybeUninit::slice_assume_init_mut(spare_cap) }) {
Ok(0) => return Ok(len), // EOF reached
Ok(bytes_read) => {
assert!(bytes_read <= spare_cap.len());
// Safety: The initializer contract guarantees that either it or `read`
// will have initialized these bytes. And we just checked that the number
// of bytes is within the buffer capacity.
unsafe { buf.set_len(buf.len() + bytes_read) };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, is std being built with polonious?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what is causing surprise here. The x.foo(x.bar())? Isn't that just two-phase borrows?

len += bytes_read as u64;
// Read again if the buffer still has enough capacity, as BufWriter itself would do
// This will occur if the reader returns short reads
continue;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}

writer.flush_buf()?;
Comment on lines +122 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think putting this in an else branch will be clearer, if the previous conditions are hit all will either continue or return, it does not reach this part.

}
}
}

fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>(
reader: &mut R,
writer: &mut W,
) -> Result<u64> {
let mut buf = MaybeUninit::<[u8; DEFAULT_BUF_SIZE]>::uninit();
// FIXME: #42788
//
// - This creates a (mut) reference to a slice of
Expand Down
50 changes: 49 additions & 1 deletion library/std/src/io/util/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::cmp::{max, min};
use crate::io::prelude::*;
use crate::io::{copy, empty, repeat, sink, Empty, Repeat, SeekFrom, Sink};
use crate::io::{
copy, empty, repeat, sink, BufWriter, Empty, Repeat, Result, SeekFrom, Sink, DEFAULT_BUF_SIZE,
};

#[test]
fn copy_copies() {
Expand All @@ -11,6 +14,51 @@ fn copy_copies() {
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}

struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}

impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}

struct WriteObserver {
observed_buffer: usize,
}

impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}

fn flush(&mut self) -> Result<()> {
Ok(())
}
}

#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}

#[test]
fn sink_sinks() {
let mut s = sink();
Expand Down
2 changes: 2 additions & 0 deletions library/std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@
#![feature(maybe_uninit_extra)]
#![feature(maybe_uninit_ref)]
#![feature(maybe_uninit_slice)]
#![feature(maybe_uninit_uninit_array)]
#![feature(min_specialization)]
#![feature(needs_panic_runtime)]
#![feature(negative_impls)]
Expand Down Expand Up @@ -326,6 +327,7 @@
#![feature(unsafe_cell_raw_get)]
#![feature(unwind_attributes)]
#![feature(vec_into_raw_parts)]
#![feature(vec_spare_capacity)]
#![feature(wake_trait)]
// NB: the above list is sorted to minimize merge conflicts.
#![default_lib_allocator]
Expand Down