Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore received frames on a stream locally reset #174

Merged
merged 9 commits into from
Dec 18, 2017
37 changes: 35 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio_io::io::WriteAll;

use std::fmt;
use std::marker::PhantomData;
use std::time::Duration;

/// In progress H2 connection binding
#[must_use = "futures do nothing unless polled"]
Expand Down Expand Up @@ -45,6 +46,12 @@ pub struct ResponseFuture {
/// Build a Client.
#[derive(Clone, Debug)]
pub struct Builder {
/// Time to keep locally reset streams around before reaping.
reset_stream_duration: Duration,

/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

/// Initial `Settings` frame to send as part of the handshake.
settings: Settings,

Expand Down Expand Up @@ -208,6 +215,26 @@ impl Builder {
self
}

/// Set the maximum number of concurrent locally reset streams.
///
/// Locally reset streams are to "ignore frames from the peer for some
/// time". While waiting for that time, locally reset streams "waste"
/// space in order to be able to ignore those frames. This setting
/// can limit how many extra streams are left waiting for "some time".
pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
self.reset_stream_max = max;
self
}

/// Set the maximum number of concurrent locally reset streams.
///
/// Locally reset streams are to "ignore frames from the peer for some
/// time", but that time is unspecified. Set that time with this setting.
pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
self.reset_stream_duration = dur;
self
}

/// Enable or disable the server to send push promises.
pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
self.settings.set_enable_push(enabled);
Expand Down Expand Up @@ -245,6 +272,8 @@ impl Builder {
impl Default for Builder {
fn default() -> Builder {
Builder {
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
settings: Default::default(),
stream_id: 1.into(),
}
Expand Down Expand Up @@ -324,8 +353,12 @@ where
.buffer(self.builder.settings.clone().into())
.expect("invalid SETTINGS frame");

let connection =
proto::Connection::new(codec, &self.builder.settings, self.builder.stream_id);
let connection = proto::Connection::new(codec, proto::Config {
next_stream_id: self.builder.stream_id,
reset_stream_duration: self.builder.reset_stream_duration,
reset_stream_max: self.builder.reset_stream_max,
settings: self.builder.settings.clone(),
});
let client = Client {
inner: connection.streams().clone(),
pending: None,
Expand Down
12 changes: 6 additions & 6 deletions src/frame/go_away.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use bytes::{BigEndian, BufMut};
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct GoAway {
last_stream_id: StreamId,
error_code: u32,
error_code: Reason,
}

impl GoAway {
pub fn new(last_stream_id: StreamId, reason: Reason) -> Self {
GoAway {
last_stream_id,
error_code: reason.into(),
error_code: reason,
}
}

Expand All @@ -21,7 +21,7 @@ impl GoAway {
}

pub fn reason(&self) -> Reason {
self.error_code.into()
self.error_code
}

pub fn load(payload: &[u8]) -> Result<GoAway, Error> {
Expand All @@ -34,16 +34,16 @@ impl GoAway {

Ok(GoAway {
last_stream_id: last_stream_id,
error_code: error_code,
error_code: error_code.into(),
})
}

pub fn encode<B: BufMut>(&self, dst: &mut B) {
trace!("encoding GO_AWAY; code={}", self.error_code);
trace!("encoding GO_AWAY; code={:?}", self.error_code);
let head = Head::new(Kind::GoAway, 0, StreamId::zero());
head.encode(8, dst);
dst.put_u32::<BigEndian>(self.last_stream_id.into());
dst.put_u32::<BigEndian>(self.error_code);
dst.put_u32::<BigEndian>(self.error_code.into());
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/frame/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use bytes::{BigEndian, BufMut};
#[derive(Debug, Eq, PartialEq)]
pub struct Reset {
stream_id: StreamId,
error_code: u32,
error_code: Reason,
}

impl Reset {
pub fn new(stream_id: StreamId, error: Reason) -> Reset {
Reset {
stream_id,
error_code: error.into(),
error_code: error,
}
}

Expand All @@ -21,7 +21,7 @@ impl Reset {
}

pub fn reason(&self) -> Reason {
self.error_code.into()
self.error_code
}

pub fn load(head: Head, payload: &[u8]) -> Result<Reset, Error> {
Expand All @@ -33,19 +33,19 @@ impl Reset {

Ok(Reset {
stream_id: head.stream_id(),
error_code: error_code,
error_code: error_code.into(),
})
}

pub fn encode<B: BufMut>(&self, dst: &mut B) {
trace!(
"encoding RESET; id={:?} code={}",
"encoding RESET; id={:?} code={:?}",
self.stream_id,
self.error_code
);
let head = Head::new(Kind::Reset, 0, self.stream_id);
head.encode(4, dst);
dst.put_u32::<BigEndian>(self.error_code);
dst.put_u32::<BigEndian>(self.error_code.into());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![deny(warnings, missing_debug_implementations, missing_docs)]
//#![deny(warnings, missing_debug_implementations, missing_docs)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this stay enabled? I assume this line was accidentally committed.


//! HTTP2

Expand Down
30 changes: 23 additions & 7 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {client, frame, proto, server};
use codec::RecvError;
use frame::Reason;
use frame::{Reason, StreamId};

use frame::DEFAULT_INITIAL_WINDOW_SIZE;
use proto::*;
Expand All @@ -10,6 +10,7 @@ use futures::Stream;
use tokio_io::{AsyncRead, AsyncWrite};

use std::marker::PhantomData;
use std::time::Duration;

/// An H2 connection
#[derive(Debug)]
Expand Down Expand Up @@ -42,6 +43,14 @@ where
_phantom: PhantomData<P>,
}

#[derive(Debug, Clone)]
pub(crate) struct Config {
pub next_stream_id: StreamId,
pub reset_stream_duration: Duration,
pub reset_stream_max: usize,
pub settings: frame::Settings,
}

#[derive(Debug)]
enum State {
/// Currently open in a sane state
Expand All @@ -65,18 +74,19 @@ where
{
pub fn new(
codec: Codec<T, Prioritized<B::Buf>>,
settings: &frame::Settings,
next_stream_id: frame::StreamId,
config: Config,
) -> Connection<T, P, B> {
let streams = Streams::new(streams::Config {
local_init_window_sz: settings
local_init_window_sz: config.settings
.initial_window_size()
.unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
local_max_initiated: None,
local_next_stream_id: next_stream_id,
local_push_enabled: settings.is_push_enabled(),
local_next_stream_id: config.next_stream_id,
local_push_enabled: config.settings.is_push_enabled(),
local_reset_duration: config.reset_stream_duration,
local_reset_max: config.reset_stream_max,
remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
remote_max_initiated: settings
remote_max_initiated: config.settings
.max_concurrent_streams()
.map(|max| max as usize),
});
Expand Down Expand Up @@ -230,6 +240,8 @@ where
fn poll2(&mut self) -> Poll<(), RecvError> {
use frame::Frame::*;

self.clear_expired_reset_streams();

loop {
// First, ensure that the `Connection` is able to receive a frame
try_ready!(self.poll_ready());
Expand Down Expand Up @@ -284,6 +296,10 @@ where
}
}
}

fn clear_expired_reset_streams(&mut self) {
self.streams.clear_expired_reset_streams();
}
}

impl<T, B> Connection<T, client::Peer, B>
Expand Down
4 changes: 3 additions & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod ping_pong;
mod settings;
mod streams;

pub(crate) use self::connection::Connection;
pub(crate) use self::connection::{Config, Connection};
pub(crate) use self::error::Error;
pub(crate) use self::peer::{Peer, Dyn as DynPeer};
pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams};
Expand All @@ -31,3 +31,5 @@ pub type WindowSize = u32;

// Constants
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
44 changes: 41 additions & 3 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub(super) struct Counts {

/// Current number of locally initiated streams
num_recv_streams: usize,

/// Maximum number of pending locally reset streams
max_reset_streams: usize,

/// Current number of pending locally reset streams
num_reset_streams: usize,
}

impl Counts {
Expand All @@ -30,6 +36,8 @@ impl Counts {
num_send_streams: 0,
max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
num_recv_streams: 0,
max_reset_streams: config.local_reset_max,
num_reset_streams: 0,
}
}

Expand Down Expand Up @@ -72,6 +80,22 @@ impl Counts {
self.num_send_streams += 1;
}

/// Returns true if the number of pending reset streams can be incremented.
pub fn can_inc_num_reset_streams(&self) -> bool {
self.max_reset_streams > self.num_reset_streams
}

/// Increments the number of pending reset streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_reset_streams(&mut self) {
assert!(self.can_inc_num_reset_streams());

self.num_reset_streams += 1;
}

pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
Expand All @@ -87,19 +111,26 @@ impl Counts {
F: FnOnce(&mut Self, &mut store::Ptr) -> U,
{
let is_counted = stream.is_counted();
let is_pending_reset = stream.is_pending_reset_expiration();

// Run the action
let ret = f(self, &mut stream);

self.transition_after(stream, is_counted);
self.transition_after(stream, is_counted, is_pending_reset);

ret
}

// TODO: move this to macro?
pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool) {
pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool, is_reset_counted: bool) {
if stream.is_closed() {
stream.unlink();
if !stream.is_pending_reset_expiration() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that I'm following this logic (and that it is correct), but it would be super helpful to add a comment describing how transition_after works now that it is getting pretty involved.

stream.unlink();

if is_reset_counted {
self.dec_num_reset_streams();
}
}

if is_counted {
// Decrement the number of active streams.
Expand All @@ -115,9 +146,16 @@ impl Counts {

fn dec_num_streams(&mut self, id: StreamId) {
if self.peer.is_local_init(id) {
assert!(self.num_send_streams > 0);
self.num_send_streams -= 1;
} else {
assert!(self.num_recv_streams > 0);
self.num_recv_streams -= 1;
}
}

fn dec_num_reset_streams(&mut self) {
assert!(self.num_reset_streams > 0);
self.num_reset_streams -= 1;
}
}
9 changes: 8 additions & 1 deletion src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use self::prioritize::Prioritize;
use self::recv::Recv;
use self::send::Send;
use self::state::State;
use self::store::{Entry, Store};
use self::store::Store;
use self::stream::Stream;

use frame::{StreamId, StreamIdOverflow};
use proto::*;

use std::time::Duration;
use bytes::Bytes;
use http::{Request, Response};

Expand All @@ -43,6 +44,12 @@ pub struct Config {
/// If the local peer is willing to receive push promises
pub local_push_enabled: bool,

/// How long a locally reset stream should ignore frames
pub local_reset_duration: Duration,

/// Maximum number of locally reset streams to keep at a time
pub local_reset_max: usize,

/// Initial window size of remote initiated streams
pub remote_init_window_sz: WindowSize,

Expand Down
Loading