Skip to content

Commit

Permalink
PendingStreams: add alternative (unfair) send stream scheduling strategy
Browse files Browse the repository at this point in the history
This adds TransportConfig::send_fairness(bool). When set to false,
streams are still scheduled based on priority, but once a chunk of a
stream has been written out, we'll try to complete the stream instead of
trying to round-robin balance it among the streams with the same
priority.

This reduces fragmentation, protocol overhead and stream receive latency
when sending many small streams. It also sends same-priority streams in
the order they are opened. This - assuming little to no network packet
reordering - allows receivers to advertise a large stream window but
keep a smaller, sliding receive window.
  • Loading branch information
alessandrod authored and Ralith committed Oct 12, 2024
1 parent 62f1818 commit 9d63e62
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 10 deletions.
19 changes: 19 additions & 0 deletions quinn-proto/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct TransportConfig {
pub(crate) stream_receive_window: VarInt,
pub(crate) receive_window: VarInt,
pub(crate) send_window: u64,
pub(crate) send_fairness: bool,

pub(crate) packet_threshold: u32,
pub(crate) time_threshold: f32,
Expand Down Expand Up @@ -145,6 +146,21 @@ impl TransportConfig {
self
}

/// Whether to implement fair queuing for send streams having the same priority.
///
/// When enabled, connections schedule data from outgoing streams having the same priority in a
/// round-robin fashion. When disabled, streams are scheduled in the order they are written to.
///
/// Note that this only affects streams with the same priority. Higher priority streams always
/// take precedence over lower priority streams.
///
/// Disabling fairness can reduce fragmentation and protocol overhead for workloads that use
/// many small streams.
pub fn send_fairness(&mut self, value: bool) -> &mut Self {
self.send_fairness = value;
self
}

/// Maximum reordering in packet number space before FACK style loss detection considers a
/// packet lost. Should not be less than 3, per RFC5681.
pub fn packet_threshold(&mut self, value: u32) -> &mut Self {
Expand Down Expand Up @@ -337,6 +353,7 @@ impl Default for TransportConfig {
stream_receive_window: STREAM_RWND.into(),
receive_window: VarInt::MAX,
send_window: (8 * STREAM_RWND).into(),
send_fairness: true,

packet_threshold: 3,
time_threshold: 9.0 / 8.0,
Expand Down Expand Up @@ -371,6 +388,7 @@ impl fmt::Debug for TransportConfig {
stream_receive_window,
receive_window,
send_window,
send_fairness,
packet_threshold,
time_threshold,
initial_rtt,
Expand All @@ -396,6 +414,7 @@ impl fmt::Debug for TransportConfig {
.field("stream_receive_window", stream_receive_window)
.field("receive_window", receive_window)
.field("send_window", send_window)
.field("send_fairness", send_fairness)
.field("packet_threshold", packet_threshold)
.field("time_threshold", time_threshold)
.field("initial_rtt", initial_rtt)
Expand Down
4 changes: 3 additions & 1 deletion quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3216,7 +3216,9 @@ impl Connection {

// STREAM
if space_id == SpaceId::Data {
sent.stream_frames = self.streams.write_stream_frames(buf, max_size);
sent.stream_frames =
self.streams
.write_stream_frames(buf, max_size, self.config.send_fairness);
self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
}

Expand Down
28 changes: 24 additions & 4 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ impl<'a> SendStream<'a> {
/// A queue of streams with pending outgoing data, sorted by priority
struct PendingStreamsQueue {
streams: BinaryHeap<PendingStream>,
/// The next stream to write out. This is `Some` when `TransportConfig::send_fairness(false)` and writing a stream is
/// interrupted while the stream still has some pending data. See `reinsert_pending()`.
next: Option<PendingStream>,
/// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority.
/// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending`
recency: u64,
Expand All @@ -374,12 +377,28 @@ impl PendingStreamsQueue {
fn new() -> Self {
Self {
streams: BinaryHeap::new(),
next: None,
recency: u64::MAX,
}
}

/// Reinsert a stream that was pending and still contains unsent data.
fn reinsert_pending(&mut self, id: StreamId, priority: i32) {
assert!(self.next.is_none());

self.next = Some(PendingStream {
priority,
recency: self.recency, // the value here doesn't really matter
id,
});
}

/// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority
fn push_pending(&mut self, id: StreamId, priority: i32) {
// Note that in the case where fairness is disabled, if we have a reinserted stream we don't
// bump it even if priority > next.priority. In order to minimize fragmentation we
// always try to complete a stream once part of it has been written.

// As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it
// after all other queued streams of the same priority.
// This is enough to implement round-robin scheduling for streams that are still pending even after being handled,
Expand All @@ -393,25 +412,26 @@ impl PendingStreamsQueue {
}

fn pop(&mut self) -> Option<PendingStream> {
self.streams.pop()
self.next.take().or_else(|| self.streams.pop())
}

fn clear(&mut self) {
self.next = None;
self.streams.clear();
}

fn iter(&self) -> impl Iterator<Item = &PendingStream> {
self.streams.iter()
self.next.iter().chain(self.streams.iter())
}

#[cfg(test)]
fn len(&self) -> usize {
self.streams.len()
self.streams.len() + self.next.is_some() as usize
}
}

/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
#[derive(PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
struct PendingStream {
/// The priority of the stream
// Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
Expand Down
175 changes: 170 additions & 5 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ impl StreamsState {
&mut self,
buf: &mut Vec<u8>,
max_buf_size: usize,
fair: bool,
) -> StreamMetaVec {
let mut stream_frames = StreamMetaVec::new();
while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {
Expand Down Expand Up @@ -535,8 +536,13 @@ impl StreamsState {
if stream.is_pending() {
// If the stream still has pending data, reinsert it, possibly with an updated priority value
// Fairness with other streams is achieved by implementing round-robin scheduling,
// so that the other streams will have a chance to write data before we touch this stream again.
self.pending.push_pending(id, stream.priority);
// so that the other streams will have a chance to write data
// before we touch this stream again.
if fair {
self.pending.push_pending(id, stream.priority);
} else {
self.pending.reinsert_pending(id, stream.priority);
}
}

let meta = frame::StreamMeta { id, offsets, fin };
Expand Down Expand Up @@ -1306,7 +1312,7 @@ mod tests {
high.write(b"high").unwrap();

let mut buf = Vec::with_capacity(40);
let meta = server.write_stream_frames(&mut buf, 40);
let meta = server.write_stream_frames(&mut buf, 40, true);
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
assert_eq!(meta[2].id, id_low);
Expand Down Expand Up @@ -1365,15 +1371,15 @@ mod tests {
high.set_priority(-1).unwrap();

let mut buf = Vec::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40);
let meta = server.write_stream_frames(&mut buf, 40, true);
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);

// After requeuing we should end up with 2 priorities - not 3
assert_eq!(server.pending.len(), 2);

// Send the remaining data. The initial mid priority one should go first now
let meta = server.write_stream_frames(&mut buf, 1000);
let meta = server.write_stream_frames(&mut buf, 1000, true);
assert_eq!(meta.len(), 2);
assert_eq!(meta[0].id, id_mid);
assert_eq!(meta[1].id, id_high);
Expand All @@ -1382,6 +1388,165 @@ mod tests {
assert_eq!(server.pending.len(), 0);
}

#[test]
fn same_stream_priority() {
for fair in [true, false] {
let mut server = make(Side::Server);
server.set_params(&TransportParameters {
initial_max_streams_bidi: 3u32.into(),
initial_max_data: 300u32.into(),
initial_max_stream_data_bidi_remote: 300u32.into(),
..TransportParameters::default()
});

let (mut pending, state) = (Retransmits::default(), ConnState::Established);
let mut streams = Streams {
state: &mut server,
conn_state: &state,
};

// a, b and c all have the same priority
let id_a = streams.open(Dir::Bi).unwrap();
let id_b = streams.open(Dir::Bi).unwrap();
let id_c = streams.open(Dir::Bi).unwrap();

let mut stream_a = SendStream {
id: id_a,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
stream_a.write(&[b'a'; 100]).unwrap();

let mut stream_b = SendStream {
id: id_b,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
stream_b.write(&[b'b'; 100]).unwrap();

let mut stream_c = SendStream {
id: id_c,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
stream_c.write(&[b'c'; 100]).unwrap();

let mut metas = vec![];
let mut buf = Vec::with_capacity(1024);

// loop until all the streams are written
loop {
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair);
if meta.is_empty() {
break;
}
metas.extend(meta);
}

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 0);

let stream_ids = metas.iter().map(|m| m.id).collect::<Vec<_>>();
if fair {
// When fairness is enabled, if we run out of buffer space to write out a stream,
// the stream is re-queued after all the streams with the same priority.
assert_eq!(
stream_ids,
vec![id_a, id_b, id_c, id_a, id_b, id_c, id_a, id_b, id_c]
);
} else {
// When fairness is disabled the stream is re-queued before all the other streams
// with the same priority.
assert_eq!(
stream_ids,
vec![id_a, id_a, id_a, id_b, id_b, id_b, id_c, id_c, id_c]
);
}
}
}

#[test]
fn unfair_priority_bump() {
let mut server = make(Side::Server);
server.set_params(&TransportParameters {
initial_max_streams_bidi: 3u32.into(),
initial_max_data: 300u32.into(),
initial_max_stream_data_bidi_remote: 300u32.into(),
..TransportParameters::default()
});

let (mut pending, state) = (Retransmits::default(), ConnState::Established);
let mut streams = Streams {
state: &mut server,
conn_state: &state,
};

// a, and b have the same priority, c has higher priority
let id_a = streams.open(Dir::Bi).unwrap();
let id_b = streams.open(Dir::Bi).unwrap();
let id_c = streams.open(Dir::Bi).unwrap();

let mut stream_a = SendStream {
id: id_a,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
stream_a.write(&[b'a'; 100]).unwrap();

let mut stream_b = SendStream {
id: id_b,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
stream_b.write(&[b'b'; 100]).unwrap();

let mut metas = vec![];
let mut buf = Vec::with_capacity(1024);

// Write the first chunk of stream_a
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
assert!(!meta.is_empty());
metas.extend(meta);

// Queue stream_c which has higher priority
let mut stream_c = SendStream {
id: id_c,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
stream_c.set_priority(1).unwrap();
stream_c.write(&[b'b'; 100]).unwrap();

// loop until all the streams are written
loop {
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
if meta.is_empty() {
break;
}
metas.extend(meta);
}

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 0);

let stream_ids = metas.iter().map(|m| m.id).collect::<Vec<_>>();
assert_eq!(
stream_ids,
// stream_c bumps stream_b but doesn't bump stream_a which had already been partly
// written out
vec![id_a, id_a, id_a, id_c, id_c, id_c, id_b, id_b, id_b]
);
}

#[test]
fn stop_finished() {
let mut client = make(Side::Client);
Expand Down

0 comments on commit 9d63e62

Please sign in to comment.