Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only send MAX_STREAMS when >1/8 of flow control window is consumed #1898

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,10 @@ impl Connection {
/// `count`s increase both minimum and worst-case memory consumption.
pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
self.streams.set_max_concurrent(dir, count);
// If the limit was reduced, then a flow control update previously deemed insignificant may
// now be significant.
let pending = &mut self.spaces[SpaceId::Data].pending;
self.streams.queue_max_stream_id(pending);
}

/// Current number of remotely initiated streams that may be concurrently open
Expand Down
42 changes: 21 additions & 21 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct StreamsState {
/// Maximum number of remotely-initiated streams that may be opened over the lifetime of the
/// connection so far, per direction
pub(super) max_remote: [u64; 2],
/// Value of `max_remote` most recently transmitted to the peer in a `MAX_STREAMS` frame
sent_max_remote: [u64; 2],
/// Number of streams that we've given the peer permission to open and which aren't fully closed
pub(super) allocated_remote_count: [u64; 2],
/// Size of the desired stream flow control window. May be smaller than `allocated_remote_count`
Expand Down Expand Up @@ -79,8 +81,6 @@ pub struct StreamsState {
pub(super) send_window: u64,
/// Configured upper bound for how much unacked data the peer can send us per stream
pub(super) stream_receive_window: u64,
/// Whether the corresponding `max_remote` has increased
max_streams_dirty: [bool; 2],

// Pertinent state from the TransportParameters supplied by the peer
initial_max_stream_data_uni: VarInt,
Expand Down Expand Up @@ -108,6 +108,7 @@ impl StreamsState {
next: [0, 0],
max: [0, 0],
max_remote: [max_remote_bi.into(), max_remote_uni.into()],
sent_max_remote: [max_remote_bi.into(), max_remote_uni.into()],
allocated_remote_count: [max_remote_bi.into(), max_remote_uni.into()],
max_concurrent_remote_count: [max_remote_bi.into(), max_remote_uni.into()],
flow_control_adjusted: false,
Expand All @@ -127,7 +128,6 @@ impl StreamsState {
unacked_data: 0,
send_window,
stream_receive_window: stream_receive_window.into(),
max_streams_dirty: [false, false],
initial_max_stream_data_uni: 0u32.into(),
initial_max_stream_data_bidi_local: 0u32.into(),
initial_max_stream_data_bidi_remote: 0u32.into(),
Expand Down Expand Up @@ -169,7 +169,6 @@ impl StreamsState {
}
self.allocated_remote_count[dir as usize] += new_count;
self.max_remote[dir as usize] += new_count;
self.max_streams_dirty[dir as usize] = new_count != 0;
}

pub(crate) fn zero_rtt_rejected(&mut self) {
Expand All @@ -188,7 +187,8 @@ impl StreamsState {

// If 0-RTT was rejected, any flow control frames we sent were lost.
if self.flow_control_adjusted {
self.max_streams_dirty[dir as usize] = true;
// Conservative approximation of whatever we sent in transport parameters
self.sent_max_remote[dir as usize] = 0;
}
}

Expand Down Expand Up @@ -469,7 +469,7 @@ impl StreamsState {

pending.max_stream_id[dir as usize] = false;
retransmits.get_or_create().max_stream_id[dir as usize] = true;
self.max_streams_dirty[dir as usize] = false;
self.sent_max_remote[dir as usize] = self.max_remote[dir as usize];
trace!(
value = self.max_remote[dir as usize],
"MAX_STREAMS ({:?})",
Expand Down Expand Up @@ -748,9 +748,13 @@ impl StreamsState {
pub(crate) fn queue_max_stream_id(&mut self, pending: &mut Retransmits) -> bool {
let mut queued = false;
for dir in Dir::iter() {
let dirty = mem::replace(&mut self.max_streams_dirty[dir as usize], false);
pending.max_stream_id[dir as usize] |= dirty;
queued |= dirty;
let diff = self.max_remote[dir as usize] - self.sent_max_remote[dir as usize];
// To reduce traffic, only announce updates if at least 1/8 of the flow control window
// has been consumed.
if diff > self.max_concurrent_remote_count[dir as usize] / 8 {
pending.max_stream_id[dir as usize] = true;
queued = true;
}
}
queued
}
Expand Down Expand Up @@ -921,7 +925,14 @@ mod tests {

#[test]
fn trivial_flow_control() {
let mut client = make(Side::Client);
let mut client = StreamsState::new(
Side::Client,
1u32.into(),
1u32.into(),
1024 * 1024,
(1024 * 1024u32).into(),
(1024 * 1024u32).into(),
);
let id = StreamId::new(Side::Server, Dir::Uni, 0);
let initial_max = client.local_max_data;
const MESSAGE_SIZE: usize = 2048;
Expand Down Expand Up @@ -1186,10 +1197,6 @@ mod tests {
ShouldTransmit(false)
);
assert!(!client.recv.contains_key(&id), "stream state is freed");
assert!(
client.max_streams_dirty[Dir::Uni as usize],
"stream credit is issued"
);
assert_eq!(client.max_remote[Dir::Uni as usize], prev_max + 1);
}

Expand Down Expand Up @@ -1473,8 +1480,6 @@ mod tests {
};
stream.stop(0u32.into()).unwrap();

assert!(client.max_streams_dirty[Dir::Uni as usize]);

// Open stream 128
assert_eq!(
client.received(
Expand Down Expand Up @@ -1526,8 +1531,6 @@ mod tests {
// Relax limit by one
client.set_max_concurrent(Dir::Uni, 129u32.into());

assert!(client.max_streams_dirty[Dir::Uni as usize]);

// Open stream 128
assert_eq!(
client.received(
Expand Down Expand Up @@ -1571,7 +1574,6 @@ mod tests {
pending: &mut pending,
};
stream.stop(0u32.into()).unwrap();
assert!(!client.max_streams_dirty[Dir::Uni as usize]);

// Try to open stream 128, still exceeding limit
assert_eq!(
Expand Down Expand Up @@ -1607,8 +1609,6 @@ mod tests {
};
stream.stop(0u32.into()).unwrap();

assert!(client.max_streams_dirty[Dir::Uni as usize]);

// Open stream 128
assert_eq!(
client.received(
Expand Down