Skip to content

Commit

Permalink
Auto merge of #112330 - the8472:use-buf-reader-buffer, r=thomcc
Browse files Browse the repository at this point in the history
Extend io::copy buffer reuse to BufReader too

previously it was only able to use BufWriter. This was due to a limitation in the BufReader generics that prevented specialization. This change works around the issue by using `BufReader where Self: Read` instead of `BufReader<I> where I: Read`. This limits our options, e.g. we can't access the inner reader, but it happens to work out if we rely on some implementation details.

Copying 1MiB from `/dev/zero` to `/dev/null` through a 256KiB BufReader yields following improvements

```
OLD:
    io::copy::tests::bench_copy_buf_reader  51.44µs/iter +/- 703.00ns
NEW:
    io::copy::tests::bench_copy_buf_reader  18.55µs/iter +/- 237.00ns
```

Previously this would read 256KiB into the reader but then copy 8KiB chunks to the writer through an additional intermediate buffer inside `io::copy`. Since those devices don't do much work most of the speedup should come from fewer syscalls and avoided memcopies.

The b3sum crate [notes that the default buffer size in io::copy is too small](https://github.com/BLAKE3-team/BLAKE3/blob/4108923f5284e0f8c3cf97b59041c2b6b2f601d3/b3sum/src/main.rs#L235-L239). With this optimization they could achieve the desired performance by wrapping the reader in a `BufReader` instead of handrolling it.

Currently the optimization doesn't apply to things like `StdinLock`, but this can be addressed with an additional `AsMutBufReader` specialization trait.
  • Loading branch information
bors committed Jun 17, 2023
2 parents f90d57d + 3738785 commit 7513407
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 74 deletions.
2 changes: 1 addition & 1 deletion library/std/src/io/buffered/bufreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl<R: ?Sized> BufReader<R> {

/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(&mut self) {
pub(in crate::io) fn discard_buffer(&mut self) {
self.buf.discard_buffer()
}
}
Expand Down
110 changes: 97 additions & 13 deletions library/std/src/io/copy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use super::{BorrowedBuf, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use super::{BorrowedBuf, BufReader, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::mem::MaybeUninit;

#[cfg(test)]
mod tests;

/// Copies the entire contents of a reader into a writer.
///
/// This function will continuously read data from `reader` and then
Expand Down Expand Up @@ -71,32 +74,113 @@ where
R: Read,
W: Write,
{
BufferedCopySpec::copy_to(reader, writer)
let read_buf = BufferedReaderSpec::buffer_size(reader);
let write_buf = BufferedWriterSpec::buffer_size(writer);

if read_buf >= DEFAULT_BUF_SIZE && read_buf >= write_buf {
return BufferedReaderSpec::copy_to(reader, writer);
}

BufferedWriterSpec::copy_from(writer, reader)
}

/// Specialization of the read-write loop that reuses the internal
/// buffer of a BufReader. If there's no buffer then the writer side
/// should be used intead.
trait BufferedReaderSpec {
fn buffer_size(&self) -> usize;

fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result<u64>;
}

impl<T> BufferedReaderSpec for T
where
Self: Read,
T: ?Sized,
{
#[inline]
default fn buffer_size(&self) -> usize {
0
}

default fn copy_to(&mut self, _to: &mut (impl Write + ?Sized)) -> Result<u64> {
unimplemented!("only called from specializations");
}
}

impl<I> BufferedReaderSpec for BufReader<I>
where
Self: Read,
I: ?Sized,
{
fn buffer_size(&self) -> usize {
self.capacity()
}

fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result<u64> {
let mut len = 0;

loop {
// Hack: this relies on `impl Read for BufReader` always calling fill_buf
// if the buffer is empty, even for empty slices.
// It can't be called directly here since specialization prevents us
// from adding I: Read
match self.read(&mut []) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
let buf = self.buffer();
if self.buffer().len() == 0 {
return Ok(len);
}

// In case the writer side is a BufWriter then its write_all
// implements an optimization that passes through large
// buffers to the underlying writer. That code path is #[cold]
// but we're still avoiding redundant memcopies when doing
// a copy between buffered inputs and outputs.
to.write_all(buf)?;
len += buf.len() as u64;
self.discard_buffer();
}
}
}

/// Specialization of the read-write loop that either uses a stack buffer
/// or reuses the internal buffer of a BufWriter
trait BufferedCopySpec: Write {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64>;
trait BufferedWriterSpec: Write {
fn buffer_size(&self) -> usize;

fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64>;
}

impl<W: Write + ?Sized> BufferedCopySpec for W {
default fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
stack_buffer_copy(reader, writer)
impl<W: Write + ?Sized> BufferedWriterSpec for W {
#[inline]
default fn buffer_size(&self) -> usize {
0
}

default fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
stack_buffer_copy(reader, self)
}
}

impl<I: ?Sized + Write> BufferedCopySpec for BufWriter<I> {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
if writer.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, writer);
impl<I: Write + ?Sized> BufferedWriterSpec for BufWriter<I> {
fn buffer_size(&self) -> usize {
self.capacity()
}

fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
if self.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, self);
}

let mut len = 0;
let mut init = 0;

loop {
let buf = writer.buffer_mut();
let buf = self.buffer_mut();
let mut read_buf: BorrowedBuf<'_> = buf.spare_capacity_mut().into();

unsafe {
Expand Down Expand Up @@ -127,7 +211,7 @@ impl<I: ?Sized + Write> BufferedCopySpec for BufWriter<I> {
Err(e) => return Err(e),
}
} else {
writer.flush_buf()?;
self.flush_buf()?;
init = 0;
}
}
Expand Down
108 changes: 108 additions & 0 deletions library/std/src/io/copy/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::cmp::{max, min};
use crate::io::*;

#[test]
fn copy_copies() {
let mut r = repeat(0).take(4);
let mut w = sink();
assert_eq!(copy(&mut r, &mut w).unwrap(), 4);

let mut r = repeat(0).take(1 << 17);
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}

struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}

impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}

struct WriteObserver {
observed_buffer: usize,
}

impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}

fn flush(&mut self) -> Result<()> {
Ok(())
}
}

#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}

#[test]
fn copy_specializes_bufreader() {
let mut source = vec![0; 768 * 1024];
source[1] = 42;
let mut buffered = BufReader::with_capacity(256 * 1024, Cursor::new(&mut source));

let mut sink = Vec::new();
assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64);
assert_eq!(source.as_slice(), sink.as_slice());

let buf_sz = 71 * 1024;
assert!(buf_sz > DEFAULT_BUF_SIZE, "test precondition");

let mut buffered = BufReader::with_capacity(buf_sz, Cursor::new(&mut source));
let mut sink = WriteObserver { observed_buffer: 0 };
assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64);
assert_eq!(
sink.observed_buffer, buf_sz,
"expected a large buffer to be provided to the writer"
);
}

#[cfg(unix)]
mod io_benches {
use crate::fs::File;
use crate::fs::OpenOptions;
use crate::io::prelude::*;
use crate::io::BufReader;

use test::Bencher;

#[bench]
fn bench_copy_buf_reader(b: &mut Bencher) {
let mut file_in = File::open("/dev/zero").expect("opening /dev/zero failed");
// use dyn to avoid specializations unrelated to readbuf
let dyn_in = &mut file_in as &mut dyn Read;
let mut reader = BufReader::with_capacity(256 * 1024, dyn_in.take(0));
let mut writer =
OpenOptions::new().write(true).open("/dev/null").expect("opening /dev/null failed");

const BYTES: u64 = 1024 * 1024;

b.bytes = BYTES;

b.iter(|| {
reader.get_mut().set_limit(BYTES);
crate::io::copy(&mut reader, &mut writer).unwrap()
});
}
}
61 changes: 1 addition & 60 deletions library/std/src/io/util/tests.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,8 @@
use crate::cmp::{max, min};
use crate::io::prelude::*;
use crate::io::{
copy, empty, repeat, sink, BorrowedBuf, BufWriter, Empty, Repeat, Result, SeekFrom, Sink,
DEFAULT_BUF_SIZE,
};
use crate::io::{empty, repeat, sink, BorrowedBuf, Empty, Repeat, SeekFrom, Sink};

use crate::mem::MaybeUninit;

#[test]
fn copy_copies() {
let mut r = repeat(0).take(4);
let mut w = sink();
assert_eq!(copy(&mut r, &mut w).unwrap(), 4);

let mut r = repeat(0).take(1 << 17);
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}

struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}

impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}

struct WriteObserver {
observed_buffer: usize,
}

impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}

fn flush(&mut self) -> Result<()> {
Ok(())
}
}

#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}

#[test]
fn sink_sinks() {
let mut s = sink();
Expand Down

0 comments on commit 7513407

Please sign in to comment.