Skip to content

Commit

Permalink
codec: update stream impl for Framed to return None after Err (#4166
Browse files Browse the repository at this point in the history
)
  • Loading branch information
bIgBV authored Oct 26, 2021
1 parent 827694a commit 0c68b89
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 26 deletions.
1 change: 1 addition & 0 deletions tokio-util/src/codec/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ where
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(capacity),
has_errored: false,
},
write: WriteFrame::default(),
},
Expand Down
82 changes: 56 additions & 26 deletions tokio-util/src/codec/framed_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ pin_project! {
const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;

#[derive(Debug)]
pub(crate) struct ReadFrame {
pub(crate) eof: bool,
pub(crate) is_readable: bool,
pub(crate) buffer: BytesMut,
pub(crate) has_errored: bool,
}

pub(crate) struct WriteFrame {
Expand All @@ -49,6 +51,7 @@ impl Default for ReadFrame {
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
has_errored: false,
}
}
}
Expand All @@ -72,6 +75,7 @@ impl From<BytesMut> for ReadFrame {
buffer,
is_readable: size > 0,
eof: false,
has_errored: false,
}
}
}
Expand Down Expand Up @@ -126,30 +130,42 @@ where
//
// The initial state is `reading`.
//
// | state | eof | is_readable |
// |---------|-------|-------------|
// | reading | false | false |
// | framing | false | true |
// | pausing | true | true |
// | paused | true | false |
//
// `decode_eof`
// returns `Some` read 0 bytes
// │ │ │ │
// │ ▼ │ ▼
// ┌───────┐ `decode_eof` ┌──────┐
// ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐
// │ └───────┘ └──────┘ │
// pending read┐ │ ┌──────┐ │ ▲ │
// │ │ │ │ │ │ │ │
// │ ▼ │ │ `decode` returns `Some`│ pending read
// │ ╔═══════╗ ┌───────┐◀─┘ │
// └──║reading║─read n>0 bytes─▶│framing│ │
// ╚═══════╝ └───────┘◀──────read n>0 bytes┘
// ▲ │
// │ │
// └─`decode` returns `None`─┘
// | state | eof | is_readable | has_errored |
// |---------|-------|-------------|-------------|
// | reading | false | false | false |
// | framing | false | true | false |
// | pausing | true | true | false |
// | paused | true | false | false |
// | errored | <any> | <any> | true |
// `decode_eof` returns Err
// ┌────────────────────────────────────────────────────────┐
// `decode_eof` returns │ │
// `Ok(Some)` │ │
// ┌─────┐ │ `decode_eof` returns After returning │
// Read 0 bytes ├─────▼──┴┐ `Ok(None)` ┌────────┐ ◄───┐ `None` ┌───▼─────┐
// ┌────────────────►│ Pausing ├───────────────────────►│ Paused ├─┐ └───────────┤ Errored │
// │ └─────────┘ └─┬──▲───┘ │ └───▲───▲─┘
// Pending read │ │ │ │ │ │
// ┌──────┐ │ `decode` returns `Some` │ └─────┘ │ │
// │ │ │ ┌──────┐ │ Pending │ │
// │ ┌────▼──┴─┐ Read n>0 bytes ┌┴──────▼─┐ read n>0 bytes │ read │ │
// └─┤ Reading ├───────────────►│ Framing │◄────────────────────────┘ │ │
// └──┬─▲────┘ └─────┬──┬┘ │ │
// │ │ │ │ `decode` returns Err │ │
// │ └───decode` returns `None`──┘ └───────────────────────────────────────────────────────┘ │
// │ read returns Err │
// └────────────────────────────────────────────────────────────────────────────────────────────┘
loop {
// Return `None` if we have encountered an error from the underlying decoder
// See: https://github.com/tokio-rs/tokio/issues/3976
if state.has_errored {
// preparing has_errored -> paused
trace!("Returning None and setting paused");
state.is_readable = false;
state.has_errored = false;
return Poll::Ready(None);
}

// Repeatedly call `decode` or `decode_eof` while the buffer is "readable",
// i.e. it _might_ contain data consumable as a frame or closing frame.
// Both signal that there is no such data by returning `None`.
Expand All @@ -165,7 +181,11 @@ where
// pausing or framing
if state.eof {
// pausing
let frame = pinned.codec.decode_eof(&mut state.buffer)?;
let frame = pinned.codec.decode_eof(&mut state.buffer).map_err(|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
})?;
if frame.is_none() {
state.is_readable = false; // prepare pausing -> paused
}
Expand All @@ -176,7 +196,11 @@ where
// framing
trace!("attempting to decode a frame");

if let Some(frame) = pinned.codec.decode(&mut state.buffer)? {
if let Some(frame) = pinned.codec.decode(&mut state.buffer).map_err(|op| {
trace!("Got an error, going to errored state");
state.has_errored = true;
op
})? {
trace!("frame decoded from buffer");
// implicit framing -> framing
return Poll::Ready(Some(Ok(frame)));
Expand All @@ -190,7 +214,13 @@ where
// Make sure we've got room for at least one byte to read to ensure
// that we don't get a spurious 0 that looks like EOF.
state.buffer.reserve(1);
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer).map_err(
|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
},
)? {
Poll::Ready(ct) => ct,
// implicit reading -> reading or implicit paused -> paused
Poll::Pending => return Poll::Pending,
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/codec/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ where
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(capacity),
has_errored: false,
},
},
}
Expand Down
38 changes: 38 additions & 0 deletions tokio-util/tests/framed_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use futures_core::stream::Stream;
use std::{io, pin::Pin};
use tokio_test::{assert_ready, io::Builder, task};
use tokio_util::codec::{BytesCodec, FramedRead};

macro_rules! pin {
($id:ident) => {
Pin::new(&mut $id)
};
}

macro_rules! assert_read {
($e:expr, $n:expr) => {{
let val = assert_ready!($e);
assert_eq!(val.unwrap().unwrap(), $n);
}};
}

#[tokio::test]
async fn return_none_after_error() {
let mut io = FramedRead::new(
Builder::new()
.read(b"abcdef")
.read_error(io::Error::new(io::ErrorKind::Other, "Resource errored out"))
.read(b"more data")
.build(),
BytesCodec::new(),
);

let mut task = task::spawn(());

task.enter(|cx, _| {
assert_read!(pin!(io).poll_next(cx), b"abcdef".to_vec());
assert!(assert_ready!(pin!(io).poll_next(cx)).unwrap().is_err());
assert!(assert_ready!(pin!(io).poll_next(cx)).is_none());
assert_read!(pin!(io).poll_next(cx), b"more data".to_vec());
})
}

0 comments on commit 0c68b89

Please sign in to comment.