Skip to content

Commit

Permalink
rewrite read_to_end and read_to_string
Browse files Browse the repository at this point in the history
The new implementation changes the behaviour such that set_len is called
after poll_read. The motivation of this change is that it makes it much
more obvious that a rouge panic wont give the caller access to a vector
containing exposed uninitialized memory. The new implementation also
makes sure to not zero memory twice.

Additionally it makes the various implementations more consistent with
each other regarding naming of variables, and whether we store how many
bytes we have read, or how many were in the container originally.

Fixes: tokio-rs#2544
  • Loading branch information
Darksonn committed May 21, 2020
1 parent 4157a5c commit 0e13b7b
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 108 deletions.
49 changes: 34 additions & 15 deletions tokio/src/io/util/read_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::future::Future;
use std::io;
use std::mem;
use std::pin::Pin;
use std::string::FromUtf8Error;
use std::task::{Context, Poll};

cfg_io_util! {
Expand All @@ -16,7 +17,7 @@ cfg_io_util! {
/// This is the buffer we were provided. It will be replaced with an empty string
/// while reading to postpone utf-8 handling until after reading.
output: &'a mut String,
/// The actual allocation of the string is moved into a vector instead.
/// The actual allocation of the string is moved into this vector instead.
buf: Vec<u8>,
/// The number of bytes appended to buf. This can be less than buf.len() if
/// the buffer was not empty when the operation was started.
Expand All @@ -42,31 +43,33 @@ fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_re
*output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
}

pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
/// This handles the various failure cases and puts the string back into `output`.
///
/// The `truncate_on_io_error` bool is necessary because `read_to_string` and `read_line`
/// disagree on what should happen when an IO error occurs.
pub(super) fn finish_string_read(
io_res: io::Result<usize>,
utf8_res: Result<String, FromUtf8Error>,
read: usize,
output: &mut String,
buf: &mut Vec<u8>,
read: &mut usize,
truncate_on_io_error: bool,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));

// At this point both buf and output are empty. The allocation is in utf8_res.

debug_assert!(buf.is_empty());
match (io_res, utf8_res) {
(Ok(num_bytes), Ok(string)) => {
debug_assert_eq!(*read, 0);
debug_assert_eq!(read, 0);
*output = string;
Poll::Ready(Ok(num_bytes))
}
(Err(io_err), Ok(string)) => {
*output = string;
if truncate_on_io_error {
let original_len = output.len() - read;
output.truncate(original_len);
}
Poll::Ready(Err(io_err))
}
(Ok(num_bytes), Err(utf8_err)) => {
debug_assert_eq!(*read, 0);
debug_assert_eq!(read, 0);
put_back_original_data(output, utf8_err.into_bytes(), num_bytes);

Poll::Ready(Err(io::Error::new(
Expand All @@ -75,13 +78,29 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
)))
}
(Err(io_err), Err(utf8_err)) => {
put_back_original_data(output, utf8_err.into_bytes(), *read);
put_back_original_data(output, utf8_err.into_bytes(), read);

Poll::Ready(Err(io_err))
}
}
}

pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
output: &mut String,
buf: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));

// At this point both buf and output are empty. The allocation is in utf8_res.

debug_assert!(buf.is_empty());
finish_string_read(io_res, utf8_res, *read, output, false)
}

impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
type Output = io::Result<usize>;

Expand Down
187 changes: 131 additions & 56 deletions tokio/src/io/util/read_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::io::AsyncRead;

use std::future::Future;
use std::io;
use std::mem::MaybeUninit;
use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -12,81 +12,154 @@ use std::task::{Context, Poll};
pub struct ReadToEnd<'a, R: ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
start_len: usize,
/// The number of bytes appended to buf. This can be less than buf.len() if
/// the buffer was not empty when the operation was started.
read: usize,
}

pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buffer: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
let start_len = buf.len();
prepare_buffer(buffer, reader);
ReadToEnd {
reader,
buf,
start_len,
buf: buffer,
read: 0,
}
}

struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe {
self.buf.set_len(self.len);
/// SAFETY: Before first calling this method, the unused capacity must have been
/// prepared for use with the provided AsyncRead. This can be done using the
/// `prepare_buffer` function later in this file.
pub(super) unsafe fn read_to_end_internal<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
mut reader: Pin<&mut R>,
num_read: &mut usize,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
loop {
// safety: The caller promised to prepare the buffer.
let ret = ready!(poll_read_to_end(buf, reader.as_mut(), cx));
match ret {
Err(err) => return Poll::Ready(Err(err)),
Ok(0) => return Poll::Ready(Ok(mem::replace(num_read, 0))),
Ok(num) => {
*num_read += num;
}
}
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
/// Tries to read from the provided AsyncRead.
///
/// The length of the buffer is increased by the number of bytes read.
///
/// SAFETY: Before first calling this method, the unused capacity must have been
/// prepared for use with the provided AsyncRead. This can be done using the
/// `prepare_buffer` function later in this file.
unsafe fn poll_read_to_end<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
start_len: usize,
read: Pin<&mut R>,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
let mut g = Guard {
len: buf.len(),
buf,
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
reserve(buf, &*read, 32);

let unused_capacity: &mut [MaybeUninit<u8>] = get_unused_capacity(buf);

// safety: The buffer has been prepared for use with the AsyncRead before
// calling this function.
let slice: &mut [u8] = {
let ptr = unused_capacity.as_mut_ptr().cast::<u8>();
let len = unused_capacity.len();
std::slice::from_raw_parts_mut(ptr, len)
};
let ret;
loop {
if g.len == g.buf.len() {
unsafe {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);

let b = &mut *(&mut g.buf[g.len..] as *mut [u8] as *mut [MaybeUninit<u8>]);
let res = ready!(read.poll_read(cx, slice));
if let Ok(num) = res {
// safety: There are two situations:
//
// 1. The AsyncRead has not overriden `prepare_uninitialized_buffer`.
//
// In this situation, the default implementation of that method will have
// zeroed the unused capacity. This means that setting the length will
// never expose uninitialized memory in the vector.
//
// Note that the assert! below ensures that we don't set the length to
// something larger than the capacity, which malicious implementors might
// try to have us do.
//
// 2. The AsyncRead has overriden `prepare_uninitialized_buffer`.
//
// In this case, the safety of the `set_len` call below relies on this
// guarantee from the documentation on `prepare_uninitialized_buffer`:
//
// > This function isn't actually unsafe to call but unsafe to implement.
// > The implementer must ensure that either the whole buf has been zeroed
// > or poll_read() overwrites the buffer without reading it and returns
// > correct value.
//
// Note that `prepare_uninitialized_buffer` is unsafe to implement, so this
// is a guarantee we can rely on in unsafe code.
//
// The assert!() is technically only necessary in the first case.
let new_len = buf.len() + num;
assert!(new_len <= buf.capacity());

rd.prepare_uninitialized_buffer(b);
}
}
buf.set_len(new_len);
}
Poll::Ready(res)
}

match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len));
break;
}
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
break;
}
}
/// This function prepares the unused capacity for use with the provided AsyncRead.
pub(super) fn prepare_buffer<R: AsyncRead + ?Sized>(buf: &mut Vec<u8>, read: &R) {
let buffer = get_unused_capacity(buf);

// safety: This function is only unsafe to implement.
unsafe {
read.prepare_uninitialized_buffer(buffer);
}
}

/// Allocates more memory and ensures that the unused capacity is prepared for use
/// with the `AsyncRead`.
fn reserve<R: AsyncRead + ?Sized>(buf: &mut Vec<u8>, read: &R, bytes: usize) {
if buf.capacity() - buf.len() >= bytes {
return;
}
buf.reserve(bytes);
// The call above has reallocated the buffer, so we must reinitialize the entire
// unused capacity, even if we already initialized some of it before the resize.
prepare_buffer(buf, read);
}

ret
/// Returns the unused capacity of the provided vector.
///
/// This function does not need to be marked unsafe, as MaybeUninit<u8> does not
/// require the underlying memory to initialized.
fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] {
let prepare_from = buf.len();
let prepare_len = buf.capacity() - prepare_from;
let ptr = unsafe {
buf.as_mut_ptr()
.offset(prepare_from as isize)
.cast::<MaybeUninit<u8>>()
};

// safety: The memory is properly allocated due to the invariants provided by
// Vec<u8>, and since the item type is MaybeUninit<u8>, it is safe for the
// memory to be uninitialized.
let slice: &mut [MaybeUninit<u8>] = unsafe { std::slice::from_raw_parts_mut(ptr, prepare_len) };
debug_assert_eq!(slice.len() + buf.len(), buf.capacity());
debug_assert_eq!(ptr as usize - buf[..].as_mut_ptr() as usize, buf.len(),);

slice
}

impl<A> Future for ReadToEnd<'_, A>
Expand All @@ -96,8 +169,10 @@ where
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
let Self { reader, buf, read } = &mut *self;

// safety: The constructor of ReadToEnd calls `prepare_buffer`
unsafe { read_to_end_internal(buf, Pin::new(*reader), read, cx) }
}
}

Expand Down
Loading

0 comments on commit 0e13b7b

Please sign in to comment.