diff --git a/fuzz/fuzz_targets/streams.rs b/fuzz/fuzz_targets/streams.rs index 340078e5f..e04941041 100644 --- a/fuzz/fuzz_targets/streams.rs +++ b/fuzz/fuzz_targets/streams.rs @@ -14,6 +14,7 @@ struct StreamParams { max_remote_uni: u16, max_remote_bi: u16, send_window: u16, + send_fairness: bool, receive_window: u16, stream_receive_window: u16, dir: Dir, @@ -37,6 +38,7 @@ fuzz_target!(|input: (StreamParams, Vec)| { params.max_remote_uni.into(), params.max_remote_bi.into(), params.send_window.into(), + params.send_fairness, params.receive_window.into(), params.stream_receive_window.into(), ); diff --git a/quinn-proto/src/config.rs b/quinn-proto/src/config.rs index 57f38e160..f0377721a 100644 --- a/quinn-proto/src/config.rs +++ b/quinn-proto/src/config.rs @@ -42,6 +42,7 @@ pub struct TransportConfig { pub(crate) stream_receive_window: VarInt, pub(crate) receive_window: VarInt, pub(crate) send_window: u64, + pub(crate) send_fairness: bool, pub(crate) packet_threshold: u32, pub(crate) time_threshold: f32, @@ -145,6 +146,21 @@ impl TransportConfig { self } + /// Whether to implement fair queuing for send streams having the same priority. + /// + /// When enabled, connections schedule data from outgoing streams having the same priority in a + /// round-robin fashion. When disabled, streams are scheduled in the order they are written to. + /// + /// Note that this only affects streams with the same priority. Higher priority streams always + /// take precedence over lower priority streams. + /// + /// Disabling fairness can reduce fragmentation and protocol overhead for workloads that use + /// many small streams. + pub fn send_fairness(&mut self, value: bool) -> &mut Self { + self.send_fairness = value; + self + } + /// Maximum reordering in packet number space before FACK style loss detection considers a /// packet lost. Should not be less than 3, per RFC5681. pub fn packet_threshold(&mut self, value: u32) -> &mut Self { @@ -337,6 +353,7 @@ impl Default for TransportConfig { stream_receive_window: STREAM_RWND.into(), receive_window: VarInt::MAX, send_window: (8 * STREAM_RWND).into(), + send_fairness: true, packet_threshold: 3, time_threshold: 9.0 / 8.0, @@ -371,6 +388,7 @@ impl fmt::Debug for TransportConfig { stream_receive_window, receive_window, send_window, + send_fairness, packet_threshold, time_threshold, initial_rtt, @@ -396,6 +414,7 @@ impl fmt::Debug for TransportConfig { .field("stream_receive_window", stream_receive_window) .field("receive_window", receive_window) .field("send_window", send_window) + .field("send_fairness", send_fairness) .field("packet_threshold", packet_threshold) .field("time_threshold", time_threshold) .field("initial_rtt", initial_rtt) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 65465fa1c..b8c989745 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -3213,7 +3213,9 @@ impl Connection { // STREAM if space_id == SpaceId::Data { - sent.stream_frames = self.streams.write_stream_frames(buf, max_size); + sent.stream_frames = + self.streams + .write_stream_frames(buf, max_size, self.config.send_fairness); self.stats.frame_tx.stream += sent.stream_frames.len() as u64; } diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index d0ed2dd3c..0251ee32b 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -365,6 +365,9 @@ impl<'a> SendStream<'a> { /// A queue of streams with pending outgoing data, sorted by priority struct PendingStreamsQueue { streams: BinaryHeap, + /// The next stream to write out. This is `Some` when `TransportConfig::send_fairness(false)` and writing a stream is + /// interrupted while the stream still has some pending data. See `reinsert_pending()`. + next: Option, /// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority. /// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending` recency: u64, @@ -374,29 +377,28 @@ impl PendingStreamsQueue { fn new() -> Self { Self { streams: BinaryHeap::new(), + next: None, recency: u64::MAX, } } - #[cfg(test)] - fn len(&self) -> usize { - self.streams.len() - } - - fn clear(&mut self) { - self.streams.clear(); - } + /// Reinsert a stream that was pending and still contains unsent data. + fn reinsert_pending(&mut self, id: StreamId, priority: i32) { + assert!(self.next.is_none()); - fn iter(&self) -> impl Iterator { - self.streams.iter() - } - - fn pop(&mut self) -> Option { - self.streams.pop() + self.next = Some(PendingStream { + priority, + recency: self.recency, // the value here doesn't really matter + id, + }); } /// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority fn push_pending(&mut self, id: StreamId, priority: i32) { + // Note that in the case where fairness is disabled, if we have a reinserted stream we don't + // bump it even if priority > next.priority. In order to minimize fragmentation we + // always try to complete a stream once part of it has been written. + // As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it // after all other queued streams of the same priority. // This is enough to implement round-robin scheduling for streams that are still pending even after being handled, @@ -408,10 +410,28 @@ impl PendingStreamsQueue { id, }); } + + fn pop(&mut self) -> Option { + self.next.take().or_else(|| self.streams.pop()) + } + + fn clear(&mut self) { + self.next = None; + self.streams.clear(); + } + + fn iter(&self) -> impl Iterator { + self.next.iter().chain(self.streams.iter()) + } + + #[cfg(test)] + fn len(&self) -> usize { + self.streams.len() + } } /// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency -#[derive(PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] struct PendingStream { /// The priority of the stream // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 93e5a3acb..e908dad46 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -491,6 +491,7 @@ impl StreamsState { &mut self, buf: &mut Vec, max_buf_size: usize, + fair: bool, ) -> StreamMetaVec { let mut stream_frames = StreamMetaVec::new(); while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size { @@ -535,8 +536,13 @@ impl StreamsState { if stream.is_pending() { // If the stream still has pending data, reinsert it, possibly with an updated priority value // Fairness with other streams is achieved by implementing round-robin scheduling, - // so that the other streams will have a chance to write data before we touch this stream again. - self.pending.push_pending(id, stream.priority); + // so that the other streams will have a chance to write data + // before we touch this stream again. + if fair { + self.pending.push_pending(id, stream.priority); + } else { + self.pending.reinsert_pending(id, stream.priority); + } } let meta = frame::StreamMeta { id, offsets, fin }; @@ -1306,7 +1312,7 @@ mod tests { high.write(b"high").unwrap(); let mut buf = Vec::with_capacity(40); - let meta = server.write_stream_frames(&mut buf, 40); + let meta = server.write_stream_frames(&mut buf, 40, true); assert_eq!(meta[0].id, id_high); assert_eq!(meta[1].id, id_mid); assert_eq!(meta[2].id, id_low); @@ -1365,7 +1371,7 @@ mod tests { high.set_priority(-1).unwrap(); let mut buf = Vec::with_capacity(1000); - let meta = server.write_stream_frames(&mut buf, 40); + let meta = server.write_stream_frames(&mut buf, 40, true); assert_eq!(meta.len(), 1); assert_eq!(meta[0].id, id_high); @@ -1373,7 +1379,7 @@ mod tests { assert_eq!(server.pending.len(), 2); // Send the remaining data. The initial mid priority one should go first now - let meta = server.write_stream_frames(&mut buf, 1000); + let meta = server.write_stream_frames(&mut buf, 1000, true); assert_eq!(meta.len(), 2); assert_eq!(meta[0].id, id_mid); assert_eq!(meta[1].id, id_high); @@ -1382,6 +1388,165 @@ mod tests { assert_eq!(server.pending.len(), 0); } + #[test] + fn same_stream_priority() { + for fair in [true, false] { + let mut server = make(Side::Server); + server.set_params(&TransportParameters { + initial_max_streams_bidi: 3u32.into(), + initial_max_data: 300u32.into(), + initial_max_stream_data_bidi_remote: 300u32.into(), + ..TransportParameters::default() + }); + + let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let mut streams = Streams { + state: &mut server, + conn_state: &state, + }; + + // a, b and c all have the same priority + let id_a = streams.open(Dir::Bi).unwrap(); + let id_b = streams.open(Dir::Bi).unwrap(); + let id_c = streams.open(Dir::Bi).unwrap(); + + let mut stream_a = SendStream { + id: id_a, + state: &mut server, + pending: &mut pending, + conn_state: &state, + }; + stream_a.write(&[b'a'; 100]).unwrap(); + + let mut stream_b = SendStream { + id: id_b, + state: &mut server, + pending: &mut pending, + conn_state: &state, + }; + stream_b.write(&[b'b'; 100]).unwrap(); + + let mut stream_c = SendStream { + id: id_c, + state: &mut server, + pending: &mut pending, + conn_state: &state, + }; + stream_c.write(&[b'c'; 100]).unwrap(); + + let mut metas = vec![]; + let mut buf = Vec::with_capacity(1024); + + // loop until all the streams are written + loop { + let buf_len = buf.len(); + let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair); + if meta.is_empty() { + break; + } + metas.extend(meta); + } + + assert!(!server.can_send_stream_data()); + assert_eq!(server.pending.len(), 0); + + let stream_ids = metas.iter().map(|m| m.id).collect::>(); + if fair { + // When fairness is enabled, if we run out of buffer space to write out a stream, + // the stream is re-queued after all the streams with the same priority. + assert_eq!( + stream_ids, + vec![id_a, id_b, id_c, id_a, id_b, id_c, id_a, id_b, id_c] + ); + } else { + // When fairness is disabled the stream is re-queued before all the other streams + // with the same priority. + assert_eq!( + stream_ids, + vec![id_a, id_a, id_a, id_b, id_b, id_b, id_c, id_c, id_c] + ); + } + } + } + + #[test] + fn unfair_priority_bump() { + let mut server = make(Side::Server); + server.set_params(&TransportParameters { + initial_max_streams_bidi: 3u32.into(), + initial_max_data: 300u32.into(), + initial_max_stream_data_bidi_remote: 300u32.into(), + ..TransportParameters::default() + }); + + let (mut pending, state) = (Retransmits::default(), ConnState::Established); + let mut streams = Streams { + state: &mut server, + conn_state: &state, + }; + + // a, and b have the same priority, c has higher priority + let id_a = streams.open(Dir::Bi).unwrap(); + let id_b = streams.open(Dir::Bi).unwrap(); + let id_c = streams.open(Dir::Bi).unwrap(); + + let mut stream_a = SendStream { + id: id_a, + state: &mut server, + pending: &mut pending, + conn_state: &state, + }; + stream_a.write(&[b'a'; 100]).unwrap(); + + let mut stream_b = SendStream { + id: id_b, + state: &mut server, + pending: &mut pending, + conn_state: &state, + }; + stream_b.write(&[b'b'; 100]).unwrap(); + + let mut metas = vec![]; + let mut buf = Vec::with_capacity(1024); + + // Write the first chunk of stream_a + let buf_len = buf.len(); + let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + assert!(!meta.is_empty()); + metas.extend(meta); + + // Queue stream_c which has higher priority + let mut stream_c = SendStream { + id: id_c, + state: &mut server, + pending: &mut pending, + conn_state: &state, + }; + stream_c.set_priority(1).unwrap(); + stream_c.write(&[b'b'; 100]).unwrap(); + + // loop until all the streams are written + loop { + let buf_len = buf.len(); + let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + if meta.is_empty() { + break; + } + metas.extend(meta); + } + + assert!(!server.can_send_stream_data()); + assert_eq!(server.pending.len(), 0); + + let stream_ids = metas.iter().map(|m| m.id).collect::>(); + assert_eq!( + stream_ids, + // stream_c bumps stream_b but doesn't bump stream_a which had already been partly + // written out + vec![id_a, id_a, id_a, id_c, id_c, id_c, id_b, id_b, id_b] + ); + } + #[test] fn stop_finished() { let mut client = make(Side::Client);