Skip to content

Commit

Permalink
io: rewrite read_to_end and read_to_string (#2560)
Browse files Browse the repository at this point in the history
The new implementation changes the behavior such that set_len is called
after poll_read. The motivation of this change is that it makes it much
more obvious that a rouge panic won't give the caller access to a vector
containing exposed uninitialized memory. The new implementation also
makes sure to not zero memory twice.

Additionally, it makes the various implementations more consistent with
each other regarding the naming of variables, and whether we store how many
bytes we have read, or how many were in the container originally.

Fixes: #2544
  • Loading branch information
Darksonn authored Jul 28, 2020
1 parent 084fcd7 commit 03b68f4
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 158 deletions.
50 changes: 35 additions & 15 deletions tokio/src/io/util/read_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::future::Future;
use std::io;
use std::mem;
use std::pin::Pin;
use std::string::FromUtf8Error;
use std::task::{Context, Poll};

cfg_io_util! {
Expand All @@ -16,7 +17,7 @@ cfg_io_util! {
/// This is the buffer we were provided. It will be replaced with an empty string
/// while reading to postpone utf-8 handling until after reading.
output: &'a mut String,
/// The actual allocation of the string is moved into a vector instead.
/// The actual allocation of the string is moved into this vector instead.
buf: Vec<u8>,
/// The number of bytes appended to buf. This can be less than buf.len() if
/// the buffer was not empty when the operation was started.
Expand All @@ -42,31 +43,33 @@ fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_re
*output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
}

pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
/// This handles the various failure cases and puts the string back into `output`.
///
/// The `truncate_on_io_error` bool is necessary because `read_to_string` and `read_line`
/// disagree on what should happen when an IO error occurs.
pub(super) fn finish_string_read(
io_res: io::Result<usize>,
utf8_res: Result<String, FromUtf8Error>,
read: usize,
output: &mut String,
buf: &mut Vec<u8>,
read: &mut usize,
truncate_on_io_error: bool,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));

// At this point both buf and output are empty. The allocation is in utf8_res.

debug_assert!(buf.is_empty());
match (io_res, utf8_res) {
(Ok(num_bytes), Ok(string)) => {
debug_assert_eq!(*read, 0);
debug_assert_eq!(read, 0);
*output = string;
Poll::Ready(Ok(num_bytes))
}
(Err(io_err), Ok(string)) => {
*output = string;
if truncate_on_io_error {
let original_len = output.len() - read;
output.truncate(original_len);
}
Poll::Ready(Err(io_err))
}
(Ok(num_bytes), Err(utf8_err)) => {
debug_assert_eq!(*read, 0);
debug_assert_eq!(read, 0);
put_back_original_data(output, utf8_err.into_bytes(), num_bytes);

Poll::Ready(Err(io::Error::new(
Expand All @@ -75,13 +78,30 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
)))
}
(Err(io_err), Err(utf8_err)) => {
put_back_original_data(output, utf8_err.into_bytes(), *read);
put_back_original_data(output, utf8_err.into_bytes(), read);

Poll::Ready(Err(io_err))
}
}
}

pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
output: &mut String,
buf: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));

// At this point both buf and output are empty. The allocation is in utf8_res.

debug_assert!(buf.is_empty());
debug_assert!(output.is_empty());
finish_string_read(io_res, utf8_res, *read, output, false)
}

impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
type Output = io::Result<usize>;

Expand Down
171 changes: 114 additions & 57 deletions tokio/src/io/util/read_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::io::AsyncRead;

use std::future::Future;
use std::io;
use std::mem::MaybeUninit;
use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -12,81 +12,136 @@ use std::task::{Context, Poll};
pub struct ReadToEnd<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
start_len: usize,
/// The number of bytes appended to buf. This can be less than buf.len() if
/// the buffer was not empty when the operation was started.
read: usize,
}

pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buffer: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
let start_len = buf.len();
prepare_buffer(buffer, reader);
ReadToEnd {
reader,
buf,
start_len,
buf: buffer,
read: 0,
}
}

struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe {
self.buf.set_len(self.len);
/// # Safety
///
/// Before first calling this method, the unused capacity must have been
/// prepared for use with the provided AsyncRead. This can be done using the
/// `prepare_buffer` function later in this file.
pub(super) unsafe fn read_to_end_internal<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
mut reader: Pin<&mut R>,
num_read: &mut usize,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
loop {
// safety: The caller promised to prepare the buffer.
let ret = ready!(poll_read_to_end(buf, reader.as_mut(), cx));
match ret {
Err(err) => return Poll::Ready(Err(err)),
Ok(0) => return Poll::Ready(Ok(mem::replace(num_read, 0))),
Ok(num) => {
*num_read += num;
}
}
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
/// Tries to read from the provided AsyncRead.
///
/// The length of the buffer is increased by the number of bytes read.
///
/// # Safety
///
/// The caller ensures that the buffer has been prepared for use with the
/// AsyncRead before calling this function. This can be done using the
/// `prepare_buffer` function later in this file.
unsafe fn poll_read_to_end<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
start_len: usize,
read: Pin<&mut R>,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
let mut g = Guard {
len: buf.len(),
buf,
};
let ret;
loop {
if g.len == g.buf.len() {
unsafe {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
reserve(buf, &*read, 32);

let b = &mut *(&mut g.buf[g.len..] as *mut [u8] as *mut [MaybeUninit<u8>]);
let unused_capacity: &mut [MaybeUninit<u8>] = get_unused_capacity(buf);

rd.prepare_uninitialized_buffer(b);
}
}
// safety: The buffer has been prepared for use with the AsyncRead before
// calling this function.
let slice: &mut [u8] = &mut *(unused_capacity as *mut [MaybeUninit<u8>] as *mut [u8]);

match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len));
break;
}
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
break;
}
}
let res = ready!(read.poll_read(cx, slice));
if let Ok(num) = res {
// safety: There are two situations:
//
// 1. The AsyncRead has not overriden `prepare_uninitialized_buffer`.
//
// In this situation, the default implementation of that method will have
// zeroed the unused capacity. This means that setting the length will
// never expose uninitialized memory in the vector.
//
// Note that the assert! below ensures that we don't set the length to
// something larger than the capacity, which malicious implementors might
// try to have us do.
//
// 2. The AsyncRead has overriden `prepare_uninitialized_buffer`.
//
// In this case, the safety of the `set_len` call below relies on this
// guarantee from the documentation on `prepare_uninitialized_buffer`:
//
// > This function isn't actually unsafe to call but unsafe to implement.
// > The implementer must ensure that either the whole buf has been zeroed
// > or poll_read() overwrites the buffer without reading it and returns
// > correct value.
//
// Note that `prepare_uninitialized_buffer` is unsafe to implement, so this
// is a guarantee we can rely on in unsafe code.
//
// The assert!() is technically only necessary in the first case.
let new_len = buf.len() + num;
assert!(new_len <= buf.capacity());

buf.set_len(new_len);
}
Poll::Ready(res)
}

/// This function prepares the unused capacity for use with the provided AsyncRead.
pub(super) fn prepare_buffer<R: AsyncRead + ?Sized>(buf: &mut Vec<u8>, read: &R) {
let buffer = get_unused_capacity(buf);

ret
// safety: This function is only unsafe to implement.
unsafe {
read.prepare_uninitialized_buffer(buffer);
}
}

/// Allocates more memory and ensures that the unused capacity is prepared for use
/// with the `AsyncRead`.
fn reserve<R: AsyncRead + ?Sized>(buf: &mut Vec<u8>, read: &R, bytes: usize) {
if buf.capacity() - buf.len() >= bytes {
return;
}
buf.reserve(bytes);
// The call above has reallocated the buffer, so we must reinitialize the entire
// unused capacity, even if we already initialized some of it before the resize.
prepare_buffer(buf, read);
}

/// Returns the unused capacity of the provided vector.
fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] {
bytes::BufMut::bytes_mut(buf)
}

impl<A> Future for ReadToEnd<'_, A>
Expand All @@ -96,8 +151,10 @@ where
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
let Self { reader, buf, read } = &mut *self;

// safety: The constructor of ReadToEnd calls `prepare_buffer`
unsafe { read_to_end_internal(buf, Pin::new(*reader), read, cx) }
}
}

Expand Down
Loading

0 comments on commit 03b68f4

Please sign in to comment.