Skip to content

Commit

Permalink
ignore received frames on a stream locally reset
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Nov 15, 2017
1 parent 79003d0 commit 21bcb0e
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 71 deletions.
12 changes: 6 additions & 6 deletions src/frame/go_away.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use bytes::{BigEndian, BufMut};
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct GoAway {
last_stream_id: StreamId,
error_code: u32,
error_code: Reason,
}

impl GoAway {
pub fn new(last_stream_id: StreamId, reason: Reason) -> Self {
GoAway {
last_stream_id,
error_code: reason.into(),
error_code: reason,
}
}

Expand All @@ -21,7 +21,7 @@ impl GoAway {
}

pub fn reason(&self) -> Reason {
self.error_code.into()
self.error_code
}

pub fn load(payload: &[u8]) -> Result<GoAway, Error> {
Expand All @@ -34,16 +34,16 @@ impl GoAway {

Ok(GoAway {
last_stream_id: last_stream_id,
error_code: error_code,
error_code: error_code.into(),
})
}

pub fn encode<B: BufMut>(&self, dst: &mut B) {
trace!("encoding GO_AWAY; code={}", self.error_code);
trace!("encoding GO_AWAY; code={:?}", self.error_code);
let head = Head::new(Kind::GoAway, 0, StreamId::zero());
head.encode(8, dst);
dst.put_u32::<BigEndian>(self.last_stream_id.into());
dst.put_u32::<BigEndian>(self.error_code);
dst.put_u32::<BigEndian>(self.error_code.into());
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/frame/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use bytes::{BigEndian, BufMut};
#[derive(Debug, Eq, PartialEq)]
pub struct Reset {
stream_id: StreamId,
error_code: u32,
error_code: Reason,
}

impl Reset {
pub fn new(stream_id: StreamId, error: Reason) -> Reset {
Reset {
stream_id,
error_code: error.into(),
error_code: error,
}
}

Expand All @@ -21,7 +21,7 @@ impl Reset {
}

pub fn reason(&self) -> Reason {
self.error_code.into()
self.error_code
}

pub fn load(head: Head, payload: &[u8]) -> Result<Reset, Error> {
Expand All @@ -33,19 +33,19 @@ impl Reset {

Ok(Reset {
stream_id: head.stream_id(),
error_code: error_code,
error_code: error_code.into(),
})
}

pub fn encode<B: BufMut>(&self, dst: &mut B) {
trace!(
"encoding RESET; id={:?} code={}",
"encoding RESET; id={:?} code={:?}",
self.stream_id,
self.error_code
);
let head = Head::new(Kind::Reset, 0, self.stream_id);
head.encode(4, dst);
dst.put_u32::<BigEndian>(self.error_code);
dst.put_u32::<BigEndian>(self.error_code.into());
}
}

Expand Down
34 changes: 25 additions & 9 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ impl Recv {
let sz = sz as WindowSize;

if !stream.state.is_recv_streaming() {
// TODO: There are cases where this can be a stream error of
// STREAM_CLOSED instead...

// Receiving a DATA frame when not expecting one is a protocol
// error.
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
Expand All @@ -368,19 +371,20 @@ impl Recv {

// Ensure that there is enough capacity on the connection before acting
// on the stream.
if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz {
return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR));
}
self.consume_connection_window(sz)?;

// Update connection level flow control
self.flow.send_data(sz);
if stream.recv_flow.window_size() < sz {
return Err(RecvError::Stream {
id: stream.id,
reason: Reason::FLOW_CONTROL_ERROR,
});
}

// Update stream level flow control
stream.recv_flow.send_data(sz);

// Track the data as in-flight
stream.in_flight_recv_data += sz;
self.in_flight_data += sz;

if stream.dec_content_length(frame.payload().len()).is_err() {
return Err(RecvError::Stream {
Expand Down Expand Up @@ -411,6 +415,19 @@ impl Recv {
Ok(())
}

pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> {
if self.flow.window_size() < sz {
return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR));
}

// Update connection level flow control
self.flow.send_data(sz);

// Track the data as in-flight
self.in_flight_data += sz;
Ok(())
}

pub fn recv_push_promise(
&mut self,
frame: frame::PushPromise,
Expand Down Expand Up @@ -475,15 +492,14 @@ impl Recv {
Ok(())
}

/// Handle remote sending an explicit RST_STREAM.
pub fn recv_reset(
&mut self,
frame: frame::Reset,
stream: &mut Stream,
) -> Result<(), RecvError> {
let err = proto::Error::Proto(frame.reason());

// Notify the stream
stream.state.recv_err(&err);
stream.state.recv_reset(frame.reason());
stream.notify_recv();
Ok(())
}
Expand Down
Loading

0 comments on commit 21bcb0e

Please sign in to comment.