Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto-tune (dynamic) stream receive window #176

Merged
merged 39 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
71f60c4
feat: dynamic stream window
mxinden Nov 22, 2023
c6ca2e5
expose rtt
mxinden Nov 22, 2023
b8764d2
Introduce connection limit
mxinden Nov 23, 2023
1839e8b
improve logging
mxinden Nov 23, 2023
5dc40c2
less logging
mxinden Nov 23, 2023
dd4cf3a
Unbounded stream receive window
mxinden Nov 23, 2023
24ca871
Rename config options
mxinden Nov 24, 2023
5ee74d1
Remove max_buffer_size and rename variables
mxinden Nov 24, 2023
504e876
Revert benchmark changes
mxinden Nov 24, 2023
3e9a870
Refactor rtt
mxinden Nov 24, 2023
8de55cf
randomize nonce
mxinden Nov 24, 2023
6630c9d
Revert to u32 and add quickcheck
mxinden Nov 24, 2023
df2e62c
Minor clean-ups
mxinden Nov 25, 2023
3fda972
Add assertions
mxinden Nov 25, 2023
2892b8d
Fix deadlock
mxinden Nov 25, 2023
85aef76
Move rtt into own module
mxinden Nov 27, 2023
a43fbee
Improve docs
mxinden Nov 27, 2023
bb0c6a5
Undo test changes
mxinden Nov 27, 2023
5d93aa2
Remove Stream::set_flag
mxinden Nov 29, 2023
cf76ccf
Document MAX_FRAME_BODY_LEN
mxinden Nov 29, 2023
b9b74db
Introduce flow_control module
mxinden Dec 1, 2023
55872bf
Remove Config::max_stream_receive_window
mxinden Dec 3, 2023
4d59c6d
Reduce diff
mxinden Dec 3, 2023
0ac5534
Remove unreachable buffer length exceeded
mxinden Dec 3, 2023
c8269b6
docs
mxinden Dec 3, 2023
fc6aaf7
Add explainer for asserts
mxinden Dec 3, 2023
d3a22b8
Remove obsolete code
mxinden Dec 3, 2023
db2971b
fmt
mxinden Dec 3, 2023
8d69551
fmt
mxinden Dec 3, 2023
8261e12
Define and use KIB MIB and GIB constants
mxinden Dec 4, 2023
7581f76
Document DOS mitigation
mxinden Dec 4, 2023
c9c71b6
Document no ping on idle connection
mxinden Dec 4, 2023
82106b0
Derive Clone
mxinden Dec 4, 2023
c3f581f
Remove RttState::Initial
mxinden Dec 4, 2023
33c6b05
Use `Duration`'s `fmt::Debug` implementation
mxinden Dec 4, 2023
68e5e1c
Refactor string formatting
mxinden Dec 4, 2023
aec5fd5
Add changelog entry
mxinden Dec 4, 2023
79c0d11
Use shadowing over mut
mxinden Dec 4, 2023
6b64a9f
Remove current_ and _size
mxinden Dec 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
members = ["yamux", "test-harness"]
members = ["yamux", "test-harness", "quickcheck-ext"]
resolver = "2"
13 changes: 13 additions & 0 deletions quickcheck-ext/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "quickcheck-ext"
version = "0.1.0"
edition = "2021"
publish = false
license = "Unlicense/MIT"

[package.metadata.release]
release = false

[dependencies]
quickcheck = "1"
num-traits = "0.2"
46 changes: 46 additions & 0 deletions quickcheck-ext/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

pub use quickcheck::*;

use core::ops::Range;
use num_traits::sign::Unsigned;

pub trait GenRange {
fn gen_range<T: Unsigned + Arbitrary + Copy>(&mut self, _range: Range<T>) -> T;

fn gen_index(&mut self, ubound: usize) -> usize {
if ubound <= (core::u32::MAX as usize) {
self.gen_range(0..ubound as u32) as usize
} else {
self.gen_range(0..ubound)
}
}
}

impl GenRange for Gen {
fn gen_range<T: Unsigned + Arbitrary + Copy>(&mut self, range: Range<T>) -> T {
<T as Arbitrary>::arbitrary(self) % (range.end - range.start) + range.start
}
}

pub trait SliceRandom {
fn shuffle<T>(&mut self, arr: &mut [T]);
fn choose_multiple<'a, T>(
&mut self,
arr: &'a [T],
amount: usize,
) -> std::iter::Take<std::vec::IntoIter<&'a T>> {
let mut v: Vec<&T> = arr.iter().collect();
self.shuffle(&mut v);
v.into_iter().take(amount)
}
}

impl SliceRandom for Gen {
fn shuffle<T>(&mut self, arr: &mut [T]) {
for i in (1..arr.len()).rev() {
// invariant: elements with index > i have been locked in place.
arr.swap(i, self.gen_index(i + 1));
}
}
}
3 changes: 1 addition & 2 deletions test-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false
[dependencies]
yamux = { path = "../yamux" }
futures = "0.3.4"
quickcheck = "1.0"
quickcheck = { package = "quickcheck-ext", path = "../quickcheck-ext" }
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.7", features = ["compat"] }
anyhow = "1"
Expand All @@ -17,7 +17,6 @@ log = "0.4.17"
criterion = "0.5"
env_logger = "0.10"
futures = "0.3.4"
quickcheck = "1.0"
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.7", features = ["compat"] }
constrained-connection = "0.1"
Expand Down
8 changes: 6 additions & 2 deletions test-harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
.try_for_each_concurrent(None, |mut stream| async move {
{
let (mut r, mut w) = AsyncReadExt::split(&mut stream);
futures::io::copy(&mut r, &mut w).await?;
futures::io::copy(&mut r, &mut w).await.unwrap();
}
stream.close().await?;
Ok(())
Expand Down Expand Up @@ -449,7 +449,11 @@ impl Arbitrary for TestConfig {
fn arbitrary(g: &mut Gen) -> Self {
let mut c = Config::default();
c.set_read_after_close(Arbitrary::arbitrary(g));
c.set_receive_window(256 * 1024 + u32::arbitrary(g) % (768 * 1024));
if bool::arbitrary(g) {
c.set_max_stream_receive_window(Some(256 * 1024 + u32::arbitrary(g) % (768 * 1024)));
} else {
c.set_max_stream_receive_window(None);
}
TestConfig(c)
}
}
4 changes: 2 additions & 2 deletions test-harness/tests/ack_backlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ where
this.worker_streams.push(ping_pong(stream.unwrap()).boxed());
continue;
}
(Poll::Ready(_), Some(_)) => {
panic!("should not be able to open stream if server hasn't acknowledged existing streams")
(Poll::Ready(e), Some(_)) => {
panic!("should not be able to open stream if server hasn't acknowledged existing streams: {:?}", e)
}
(Poll::Pending, None) => {}
}
Expand Down
6 changes: 2 additions & 4 deletions test-harness/tests/poll_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,11 @@ fn prop_config_send_recv_single() {
msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize]));

Runtime::new().unwrap().block_on(async move {
let (server, mut client) = connected_peers(cfg1, cfg2, None).await?;
let (server, mut client) = connected_peers(cfg1, cfg2, None).await.unwrap();
let server = echo_server(server);

let client = async {
let stream = future::poll_fn(|cx| client.poll_new_outbound(cx))
.await
.unwrap();
let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)).await?;
let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx)));

future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await;
Expand Down
4 changes: 2 additions & 2 deletions yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repository = "https://github.com/paritytech/yamux"
edition = "2021"

[dependencies]
futures = { version = "0.3.12", default-features = false, features = ["std"] }
futures = { version = "0.3.12", default-features = false, features = ["std", "executor"] }
log = "0.4.8"
nohash-hasher = "0.2"
parking_lot = "0.12"
Expand All @@ -20,4 +20,4 @@ pin-project = "1.1.0"

[dev-dependencies]
futures = { version = "0.3.12", default-features = false, features = ["executor"] }
quickcheck = "1.0"
quickcheck = { package = "quickcheck-ext", path = "../quickcheck-ext" }
87 changes: 53 additions & 34 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod cleanup;
mod closing;
mod rtt;
mod stream;

use crate::tagged_stream::TaggedStream;
Expand Down Expand Up @@ -287,8 +288,15 @@ struct Active<T> {

pending_frames: VecDeque<Frame<()>>,
new_outbound_stream_waker: Option<Waker>,
}

rtt: rtt::Rtt,

/// A stream's `max_stream_receive_window` can grow beyond [`DEFAULT_CREDIT`], see
/// [`Stream::next_window_update`]. This field is the sum of the bytes by which all streams'
/// `max_stream_receive_window` have each exceeded [`DEFAULT_CREDIT`]. Used to enforce
/// [`Config::max_connection_receive_window`].
accumulated_max_stream_windows: Arc<Mutex<usize>>,
}
/// `Stream` to `Connection` commands.
#[derive(Debug)]
pub(crate) enum StreamCommand {
Expand All @@ -300,7 +308,7 @@ pub(crate) enum StreamCommand {

/// Possible actions as a result of incoming frame handling.
#[derive(Debug)]
enum Action {
pub(crate) enum Action {
/// Nothing to be done.
None,
/// A new stream has been opened by the remote.
Expand Down Expand Up @@ -341,7 +349,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn new(socket: T, cfg: Config, mode: Mode) -> Self {
let id = Id::random();
log::debug!("new connection: {} ({:?})", id, mode);
let socket = frame::Io::new(id, socket, cfg.max_buffer_size).fuse();
let socket = frame::Io::new(id, socket).fuse();
Active {
id,
mode,
Expand All @@ -356,6 +364,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
},
pending_frames: VecDeque::default(),
new_outbound_stream_waker: None,
rtt: rtt::Rtt::new(),
accumulated_max_stream_windows: Default::default(),
}
}

Expand All @@ -376,6 +386,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
loop {
if self.socket.poll_ready_unpin(cx).is_ready() {
if let Some(frame) = self.rtt.next_ping() {
self.socket.start_send_unpin(frame.into())?;
continue;
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

if let Some(frame) = self.pending_frames.pop_front() {
self.socket.start_send_unpin(frame)?;
continue;
Expand Down Expand Up @@ -439,20 +454,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::trace!("{}: creating new outbound stream", self.id);

let id = self.next_stream_id()?;
let extra_credit = self.config.receive_window - DEFAULT_CREDIT;

if extra_credit > 0 {
let mut frame = Frame::window_update(id, extra_credit);
frame.header_mut().syn();
log::trace!("{}/{}: sending initial {}", self.id, id, frame.header());
self.pending_frames.push_back(frame.into());
}

let mut stream = self.make_new_outbound_stream(id, self.config.receive_window);

if extra_credit == 0 {
stream.set_flag(stream::Flag::Syn)
}
let stream = self.make_new_outbound_stream(id);

log::debug!("{}: new outbound {} of {}", self.id, stream, self);
self.streams.insert(id, stream.clone_shared());
Expand Down Expand Up @@ -537,7 +539,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
log::trace!("{}: received: {}", self.id, frame.header());

if frame.header().flags().contains(header::ACK) {
if frame.header().flags().contains(header::ACK)
&& matches!(frame.header().tag(), Tag::Data | Tag::WindowUpdate)
{
let id = frame.header().stream_id();
if let Some(stream) = self.streams.get(&id) {
stream
Expand Down Expand Up @@ -620,23 +624,24 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::error!("{}: maximum number of streams reached", self.id);
return Action::Terminate(Frame::internal_error());
}
let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
let stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
{
let mut shared = stream.shared();
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.current_receive_window_size = shared
.current_receive_window_size
.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());
}
stream.set_flag(stream::Flag::Ack);
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
let mut shared = s.lock();
if frame.body().len() > shared.window as usize {
if frame.body_len() > shared.current_receive_window_size {
log::error!(
"{}/{}: frame body larger than window of stream",
self.id,
Expand All @@ -647,8 +652,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
let max_buffer_size = self.config.max_buffer_size;
if shared.buffer.len() >= max_buffer_size {
if shared.buffer.len() >= shared.max_receive_window_size as usize {
log::error!(
"{}/{}: buffer of stream grows beyond limit",
self.id,
Expand All @@ -658,7 +662,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
header.rst();
return Action::Reset(Frame::new(header));
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.current_receive_window_size = shared
.current_receive_window_size
.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());
if let Some(w) = shared.reader.take() {
w.wake()
Expand Down Expand Up @@ -718,8 +724,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}

let credit = frame.header().credit() + DEFAULT_CREDIT;
let mut stream = self.make_new_inbound_stream(stream_id, credit);
stream.set_flag(stream::Flag::Ack);
let stream = self.make_new_inbound_stream(stream_id, credit);

if is_finish {
stream
Expand All @@ -732,7 +737,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {

if let Some(s) = self.streams.get_mut(&stream_id) {
let mut shared = s.lock();
shared.credit += frame.header().credit();
shared.current_send_window_size += frame.header().credit();
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
Expand Down Expand Up @@ -761,15 +766,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
let stream_id = frame.header().stream_id();
if frame.header().flags().contains(header::ACK) {
// pong
return Action::None;
return self.rtt.handle_pong(frame.nonce());
}
if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
let mut hdr = Header::ping(frame.header().nonce());
hdr.ack();
return Action::Ping(Frame::new(hdr));
}
log::trace!(
log::debug!(
"{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
self.id,
stream_id,
Expand All @@ -794,10 +798,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
waker.wake();
}

Stream::new_inbound(id, self.id, config, credit, sender)
Stream::new_inbound(
id,
self.id,
config,
credit,
sender,
self.rtt.clone(),
self.accumulated_max_stream_windows.clone(),
)
}

fn make_new_outbound_stream(&mut self, id: StreamId, window: u32) -> Stream {
fn make_new_outbound_stream(&mut self, id: StreamId) -> Stream {
let config = self.config.clone();

let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number.
Expand All @@ -806,7 +818,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
waker.wake();
}

Stream::new_outbound(id, self.id, config, window, sender)
Stream::new_outbound(
id,
self.id,
config,
sender,
self.rtt.clone(),
self.accumulated_max_stream_windows.clone(),
)
}

fn next_stream_id(&mut self) -> Result<StreamId> {
Expand Down
Loading