Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make quinn_proto::Connection and quinn_proto::Endpoint impl Sync #1769

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quinn-proto/src/cid_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::shared::ConnectionId;
use crate::MAX_CID_SIZE;

/// Generates connection IDs for incoming connections
pub trait ConnectionIdGenerator: Send {
pub trait ConnectionIdGenerator: Send + Sync {
/// Generates a new CID
///
/// Connection IDs MUST NOT contain any information that can be used by
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/congestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub use cubic::{Cubic, CubicConfig};
pub use new_reno::{NewReno, NewRenoConfig};

/// Common interface for different congestion controllers
pub trait Controller: Send {
pub trait Controller: Send + Sync {
/// One or more packets were just sent
#[allow(unused_variables)]
fn on_sent(&mut self, now: Instant, bytes: u64, last_packet_number: u64) {}
Expand Down
97 changes: 44 additions & 53 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
cell::RefCell,
collections::{hash_map, BinaryHeap, VecDeque},
};
use std::collections::{hash_map, BinaryHeap};

use bytes::Bytes;
use thiserror::Error;
Expand Down Expand Up @@ -237,7 +234,7 @@ impl<'a> SendStream<'a> {
self.state.unacked_data += written.bytes as u64;
trace!(stream = %self.id, "wrote {} bytes", written.bytes);
if !was_pending {
push_pending(&mut self.state.pending, self.id, stream.priority);
self.state.pending.push_pending(self.id, stream.priority);
}
Ok(written)
}
Expand Down Expand Up @@ -268,7 +265,7 @@ impl<'a> SendStream<'a> {
let was_pending = stream.is_pending();
stream.finish()?;
if !was_pending {
push_pending(&mut self.state.pending, self.id, stream.priority);
self.state.pending.push_pending(self.id, stream.priority);
}

Ok(())
Expand Down Expand Up @@ -335,60 +332,54 @@ impl<'a> SendStream<'a> {
}
}

fn push_pending(pending: &mut BinaryHeap<PendingLevel>, id: StreamId, priority: i32) {
for level in pending.iter() {
if priority == level.priority {
level.queue.borrow_mut().push_back(id);
return;
}
}

// If there is only a single level and it's empty, repurpose it for the
// required priority
if pending.len() == 1 {
if let Some(mut first) = pending.peek_mut() {
let mut queue = first.queue.borrow_mut();
if queue.is_empty() {
queue.push_back(id);
drop(queue);
first.priority = priority;
return;
}
}
}

let mut queue = VecDeque::new();
queue.push_back(id);
pending.push(PendingLevel {
queue: RefCell::new(queue),
priority,
});
/// A queue of streams with pending outgoing data, sorted by priority
struct PendingStreamsQueue {
streams: BinaryHeap<PendingStream>,
/// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority.
/// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending`
recency: u64,
}

struct PendingLevel {
// RefCell is needed because BinaryHeap doesn't have an iter_mut()
queue: RefCell<VecDeque<StreamId>>,
priority: i32,
}

impl PartialEq for PendingLevel {
fn eq(&self, other: &Self) -> bool {
self.priority.eq(&other.priority)
impl PendingStreamsQueue {
fn new() -> Self {
Self {
streams: BinaryHeap::new(),
recency: u64::MAX,
}
}
}

impl PartialOrd for PendingLevel {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
/// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority
fn push_pending(&mut self, id: StreamId, priority: i32) {
// As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it
// after all other queued streams of the same priority.
// This is enough to implement round-robin scheduling for streams that are still pending even after being handled,
// as in that case they are removed from the `BinaryHeap`, handled, and then immediately reinserted.
self.recency -= 1;
self.streams.push(PendingStream {
priority,
recency: self.recency,
id,
});
}
}

impl Eq for PendingLevel {}

impl Ord for PendingLevel {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.priority.cmp(&other.priority)
}
/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct PendingStream {
/// The priority of the stream
// Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
// (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
priority: i32,
/// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
/// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
/// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the
/// previous lowest recency value, such that all other streams of this priority will get processed once before we get back
/// round to this one
recency: u64,
/// The ID of the stream
// The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below
// the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive
id: StreamId,
}

/// Application events about streams
Expand Down
86 changes: 29 additions & 57 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{binary_heap::PeekMut, hash_map, BinaryHeap, VecDeque},
collections::{hash_map, VecDeque},
convert::TryFrom,
mem,
};
Expand All @@ -9,7 +9,7 @@ use rustc_hash::FxHashMap;
use tracing::{debug, trace};

use super::{
push_pending, PendingLevel, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent,
PendingStreamsQueue, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent,
StreamHalf, ThinRetransmits,
};
use crate::{
Expand Down Expand Up @@ -52,8 +52,8 @@ pub struct StreamsState {
/// This differs from `self.send.len()` in that it does not include streams that the peer is
/// permitted to open but which have not yet been opened.
pub(super) send_streams: usize,
/// Streams with outgoing data queued
pub(super) pending: BinaryHeap<PendingLevel>,
/// Streams with outgoing data queued, sorted by priority
pub(super) pending: PendingStreamsQueue,

events: VecDeque<StreamEvent>,
/// Streams blocked on connection-level flow control or stream window space
Expand Down Expand Up @@ -115,7 +115,7 @@ impl StreamsState {
opened: [false, false],
next_reported_remote: [0, 0],
send_streams: 0,
pending: BinaryHeap::new(),
pending: PendingStreamsQueue::new(),
events: VecDeque::new(),
connection_blocked: Vec::new(),
max_data: 0,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl StreamsState {
}
}

self.pending.clear();
self.pending.streams.clear();
self.send_streams = 0;
self.data_sent = 0;
self.connection_blocked.clear();
Expand Down Expand Up @@ -345,13 +345,11 @@ impl StreamsState {
/// Whether any stream data is queued, regardless of control frames
pub(crate) fn can_send_stream_data(&self) -> bool {
// Reset streams may linger in the pending stream list, but will never produce stream frames
self.pending.iter().any(|level| {
level.queue.borrow().iter().any(|id| {
self.send
.get(id)
.and_then(|s| s.as_ref())
.map_or(false, |s| !s.is_reset())
})
self.pending.streams.iter().any(|stream| {
self.send
.get(&stream.id)
.and_then(|s| s.as_ref())
.map_or(false, |s| !s.is_reset())
})
}

Expand Down Expand Up @@ -503,26 +501,14 @@ impl StreamsState {
break;
}

let num_levels = self.pending.len();
let mut level = match self.pending.peek_mut() {
Some(x) => x,
None => break,
};
// Poppping data from the front of the queue, storing as much data
// as possible in a single frame, and enqueing sending further
// remaining data at the end of the queue helps with fairness.
// Other streams will have a chance to write data before we touch
// this stream again.
let id = match level.queue.get_mut().pop_front() {
Some(x) => x,
None => {
debug_assert!(
num_levels == 1,
"An empty queue is only allowed for a single level"
);
break;
}
// Pop the stream of the highest priority that currently has pending data
// If the stream still has some pending data left after writing, it will be reinserted, otherwise not
let Some(stream) = self.pending.streams.pop() else {
break;
};

let id = stream.id;

let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
Some(s) => s,
// Stream was reset with pending data and the reset was acknowledged
Expand All @@ -547,24 +533,10 @@ impl StreamsState {
}

if stream.is_pending() {
if level.priority == stream.priority {
// Enqueue for the same level
level.queue.get_mut().push_back(id);
} else {
// Enqueue for a different level. If the current level is empty, drop it
if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
} else {
drop(level);
}
push_pending(&mut self.pending, id, stream.priority);
}
} else if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
// If the stream still has pending data, reinsert it, possibly with an updated priority value
// Fairness with other streams is achieved by implementing round-robin scheduling,
// so that the other streams will have a chance to write data before we touch this stream again.
self.pending.push_pending(id, stream.priority);
}

let meta = frame::StreamMeta { id, offsets, fin };
Expand Down Expand Up @@ -644,7 +616,7 @@ impl StreamsState {
Some(x) => x,
};
if !stream.is_pending() {
push_pending(&mut self.pending, frame.id, stream.priority);
self.pending.push_pending(frame.id, stream.priority);
}
stream.fin_pending |= frame.fin;
stream.pending.retransmit(frame.offsets);
Expand All @@ -664,7 +636,7 @@ impl StreamsState {
continue;
}
if !stream.is_pending() {
push_pending(&mut self.pending, id, stream.priority);
self.pending.push_pending(id, stream.priority);
}
stream.pending.retransmit_all_for_0rtt();
}
Expand Down Expand Up @@ -1324,7 +1296,7 @@ mod tests {
assert_eq!(meta[2].id, id_low);

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 1);
assert_eq!(server.pending.streams.len(), 0);
}

#[test]
Expand Down Expand Up @@ -1353,7 +1325,7 @@ mod tests {
conn_state: &state,
};
assert_eq!(mid.write(b"mid").unwrap(), 3);
assert_eq!(server.pending.len(), 1);
assert_eq!(server.pending.streams.len(), 1);

let mut high = SendStream {
id: id_high,
Expand All @@ -1363,7 +1335,7 @@ mod tests {
};
high.set_priority(1).unwrap();
assert_eq!(high.write(&[0; 200]).unwrap(), 200);
assert_eq!(server.pending.len(), 2);
assert_eq!(server.pending.streams.len(), 2);

// Requeue the high priority stream to lowest priority. The initial send
// still uses high priority since it's queued that way. After that it will
Expand All @@ -1382,7 +1354,7 @@ mod tests {
assert_eq!(meta[0].id, id_high);

// After requeuing we should end up with 2 priorities - not 3
assert_eq!(server.pending.len(), 2);
assert_eq!(server.pending.streams.len(), 2);

// Send the remaining data. The initial mid priority one should go first now
let meta = server.write_stream_frames(&mut buf, 1000);
Expand All @@ -1391,7 +1363,7 @@ mod tests {
assert_eq!(meta[1].id, id_high);

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 1);
assert_eq!(server.pending.streams.len(), 0);
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) mod ring;
pub mod rustls;

/// A cryptographic session (commonly TLS)
pub trait Session: Send + 'static {
pub trait Session: Send + Sync + 'static {
/// Create the initial set of keys given the client's initial destination ConnectionId
fn initial_keys(&self, dst_cid: &ConnectionId, side: Side) -> Keys;

Expand Down Expand Up @@ -146,7 +146,7 @@ pub trait ServerConfig: Send + Sync {
}

/// Keys used to protect packet payloads
pub trait PacketKey: Send {
pub trait PacketKey: Send + Sync {
/// Encrypt the packet payload with the given packet number
fn encrypt(&self, packet: u64, buf: &mut [u8], header_len: usize);
/// Decrypt the packet payload with the given packet number
Expand All @@ -166,7 +166,7 @@ pub trait PacketKey: Send {
}

/// Keys used to protect packet headers
pub trait HeaderKey: Send {
pub trait HeaderKey: Send + Sync {
/// Decrypt the given packet's header
fn decrypt(&self, pn_offset: usize, packet: &mut [u8]);
/// Encrypt the given packet's header
Expand Down
7 changes: 7 additions & 0 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2828,3 +2828,10 @@ fn reject_manually() {
}) if close.error_code == TransportErrorCode::CONNECTION_REFUSED
));
}

#[test]
fn endpoint_and_connection_impl_send_sync() {
const fn is_send_sync<T: Send + Sync>() {}
is_send_sync::<Endpoint>();
is_send_sync::<Connection>();
}