Skip to content

Commit

Permalink
Allow accept/reject/retry before handshake begins
Browse files Browse the repository at this point in the history
This commit removes use_retry from the server config and provides a
public API for the user to manually accept/reject/retry incoming
connections before a handshake begins, and inspect properties such as
an incoming connection's remote address and whether that address is
validated when doing so.

In quinn-proto, IncomingConnection is made public, as well as
Endpoint's accept/reject/retry methods which operate on it. The
DatagramEvent::NewConnection event is modified to return an incoming
but not yet accepted connection.

In quinn, awaiting Endpoint::accept now yields a new
quinn::IncomingConnection type, rather than quinn::Connecting. The new
quinn::IncomingConnection type has all the methods its quinn_proto
equivalent has, as well as an accept method to (fallibly) transition
it into a Connecting, and also reject, retry, and ignore methods.

Furthermore, quinn::IncomingConnection implements IntoFuture with the
output type Result<Connection, ConnectionError>>, which is the same as
the Future output type of Connecting. This lets server code which was
straightforwardly awaiting the result of quinn::Endpoint::accept work
with little to no modification.

The test accept_after_close was removed because the functionality it
was testing for no longer exists.
  • Loading branch information
gretchenfrage committed Feb 19, 2024
1 parent d841d96 commit 06f1267
Show file tree
Hide file tree
Showing 11 changed files with 492 additions and 151 deletions.
2 changes: 1 addition & 1 deletion perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn run(opt: Opt) -> Result<()> {
Ok(())
}

async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
async fn handle(handshake: quinn::IncomingConnection, opt: Arc<Opt>) -> Result<()> {
let connection = handshake.await.context("handshake failed")?;
debug!("{} connected", connection.remote_address());
tokio::try_join!(
Expand Down
14 changes: 0 additions & 14 deletions quinn-proto/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,6 @@ pub struct ServerConfig {
/// Used to generate one-time AEAD keys to protect handshake tokens
pub(crate) token_key: Arc<dyn HandshakeTokenKey>,

/// Whether to require clients to prove ownership of an address before committing resources.
///
/// Introduces an additional round-trip to the handshake to make denial of service attacks more difficult.
pub(crate) use_retry: bool,
/// Microseconds after a stateless retry token was issued for which it's considered valid.
pub(crate) retry_token_lifetime: Duration,

Expand All @@ -769,7 +765,6 @@ impl ServerConfig {
crypto,

token_key,
use_retry: false,
retry_token_lifetime: Duration::from_secs(15),

concurrent_connections: 100_000,
Expand All @@ -790,14 +785,6 @@ impl ServerConfig {
self
}

/// Whether to require clients to prove ownership of an address before committing resources.
///
/// Introduces an additional round-trip to the handshake to make denial of service attacks more difficult.
pub fn use_retry(&mut self, value: bool) -> &mut Self {
self.use_retry = value;
self
}

/// Duration after a stateless retry token was issued for which it's considered valid.
pub fn retry_token_lifetime(&mut self, value: Duration) -> &mut Self {
self.retry_token_lifetime = value;
Expand Down Expand Up @@ -858,7 +845,6 @@ impl fmt::Debug for ServerConfig {
.field("transport", &self.transport)
.field("crypto", &"ServerConfig { elided }")
.field("token_key", &"[ elided ]")
.field("use_retry", &self.use_retry)
.field("retry_token_lifetime", &self.retry_token_lifetime)
.field("concurrent_connections", &self.concurrent_connections)
.field("migration", &self.migration)
Expand Down
61 changes: 42 additions & 19 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Endpoint {
};
return match first_decode.finish(Some(&*crypto.header.remote)) {
Ok(packet) => {
self.handle_first_packet(now, addresses, ecn, packet, remaining, crypto, buf)
self.handle_first_packet(addresses, ecn, packet, remaining, crypto, buf)
}
Err(e) => {
trace!("unable to decode initial packet: {}", e);
Expand Down Expand Up @@ -399,7 +399,6 @@ impl Endpoint {

fn handle_first_packet(
&mut self,
now: Instant,
addresses: FourTuple,
ecn: Option<EcnCodepoint>,
mut packet: Packet,
Expand Down Expand Up @@ -486,7 +485,7 @@ impl Endpoint {
}
};

let incoming = IncomingConnection {
Some(DatagramEvent::NewConnection(IncomingConnection {
addresses,
ecn,
packet,
Expand All @@ -498,19 +497,11 @@ impl Endpoint {
version,
retry_src_cid,
orig_dst_cid,
};
if server_config.use_retry && !incoming.remote_address_validated() {
Some(DatagramEvent::Response(self.retry(incoming, buf).unwrap()))
} else {
match self.accept(incoming, now, buf) {
Ok((ch, conn)) => Some(DatagramEvent::NewConnection(ch, conn)),
Err((_, response)) => response.map(DatagramEvent::Response),
}
}
}))
}

/// Attempt to accept this incoming connection (an error may still occur)
fn accept(
pub fn accept(
&mut self,
incoming: IncomingConnection,
now: Instant,
Expand Down Expand Up @@ -592,10 +583,22 @@ impl Endpoint {
}
}

/// Reject this incoming connection attempt
pub fn reject(&mut self, incoming: IncomingConnection, buf: &mut BytesMut) -> Transmit {
self.initial_close(
incoming.version,
incoming.addresses,
&incoming.crypto,
&incoming.src_cid,
TransportError::CONNECTION_REFUSED(""),
buf,
)
}

/// Respond with a retry packet, requiring the client to retry with address validation
///
/// Errors if `incoming.remote_address_validated()` is true.
fn retry(
pub fn retry(
&mut self,
incoming: IncomingConnection,
buf: &mut BytesMut,
Expand Down Expand Up @@ -953,14 +956,14 @@ impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
pub enum DatagramEvent {
/// The datagram is redirected to its `Connection`
ConnectionEvent(ConnectionHandle, ConnectionEvent),
/// The datagram has resulted in starting a new `Connection`
NewConnection(ConnectionHandle, Connection),
/// The datagram may result in starting a new `Connection`
NewConnection(IncomingConnection),
/// Response generated directly by the endpoint
Response(Transmit),
}

/// An incoming connection for which the server has not yet begun its part of the handshake.
struct IncomingConnection {
pub struct IncomingConnection {
addresses: FourTuple,
ecn: Option<EcnCodepoint>,
packet: Packet,
Expand All @@ -975,11 +978,24 @@ struct IncomingConnection {
}

impl IncomingConnection {
/// The local IP address which was used when the peer established
/// the connection
///
/// This has the same behavior as [`Connection::local_ip`]
pub fn local_ip(&self) -> Option<IpAddr> {
self.addresses.local_ip
}

/// The peer's UDP address.
pub fn remote_address(&self) -> SocketAddr {
self.addresses.remote
}

/// Whether the socket address that is initiating this connection has been validated.
///
/// This means that the sender of the initial packet has proved that they can receive traffic
/// sent to `self.remote_address()`.
fn remote_address_validated(&self) -> bool {
pub fn remote_address_validated(&self) -> bool {
self.retry_src_cid.is_some()
}
}
Expand Down Expand Up @@ -1038,7 +1054,14 @@ pub enum ConnectError {
/// validation token from a previous retry
#[derive(Debug, Error)]
#[error("retry() with validated IncomingConnection")]
struct RetryError(IncomingConnection);
pub struct RetryError(IncomingConnection);

impl RetryError {
/// Get the [`IncomingConnection`]
pub fn into_incoming(self) -> IncomingConnection {
self.0
}
}

/// Reset Tokens which are associated with peer socket addresses
///
Expand Down
4 changes: 3 additions & 1 deletion quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ use crate::frame::Frame;
pub use crate::frame::{ApplicationClose, ConnectionClose, Datagram};

mod endpoint;
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};
pub use crate::endpoint::{
ConnectError, ConnectionHandle, DatagramEvent, Endpoint, IncomingConnection, RetryError,
};

mod shared;
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent};
Expand Down
40 changes: 25 additions & 15 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,8 @@ fn draft_version_compat() {
#[test]
fn stateless_retry() {
let _guard = subscribe();
let mut pair = Pair::new(
Default::default(),
ServerConfig {
use_retry: true,
..server_config()
},
);
let mut pair = Pair::default();
pair.server.incoming_connection_behavior = IncomingConnectionBehavior::Validate;
pair.connect();
}

Expand Down Expand Up @@ -459,13 +454,8 @@ fn high_latency_handshake() {
#[test]
fn zero_rtt_happypath() {
let _guard = subscribe();
let mut pair = Pair::new(
Default::default(),
ServerConfig {
use_retry: true,
..server_config()
},
);
let mut pair = Pair::default();
pair.server.incoming_connection_behavior = IncomingConnectionBehavior::Validate;
let config = client_config();

// Establish normal connection
Expand Down Expand Up @@ -1990,7 +1980,7 @@ fn connect_too_low_mtu() {

pair.begin_connect(client_config());
pair.drive();
pair.server.assert_no_accept()
pair.server.assert_no_accept();
}

#[test]
Expand Down Expand Up @@ -2760,3 +2750,23 @@ fn reject_new_connections() {
pair.server.assert_no_accept();
assert!(pair.client.connections.get(&client_ch).unwrap().is_closed());
}

#[test]
fn reject_manually() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.server.incoming_connection_behavior = IncomingConnectionBehavior::RejectAll;

// The server should now reject incoming connections.
let client_ch = pair.begin_connect(client_config());
pair.drive();
pair.server.assert_no_accept();
let client = pair.client.connections.get_mut(&client_ch).unwrap();
assert!(client.is_closed());
assert!(matches!(
client.poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::ConnectionClosed(close)
}) if close.error_code == TransportErrorCode::CONNECTION_REFUSED
))
}
82 changes: 77 additions & 5 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,19 @@ pub(super) struct TestEndpoint {
pub(super) outbound: VecDeque<(Transmit, Bytes)>,
delayed: VecDeque<(Transmit, Bytes)>,
pub(super) inbound: VecDeque<(Instant, Option<EcnCodepoint>, BytesMut)>,
accepted: Option<ConnectionHandle>,
accepted: Option<Result<ConnectionHandle, ConnectionError>>,
pub(super) connections: HashMap<ConnectionHandle, Connection>,
conn_events: HashMap<ConnectionHandle, VecDeque<ConnectionEvent>>,
pub(super) captured_packets: Vec<Vec<u8>>,
pub(super) capture_inbound_packets: bool,
pub(super) incoming_connection_behavior: IncomingConnectionBehavior,
}

#[derive(Debug, Copy, Clone)]
pub(super) enum IncomingConnectionBehavior {
AcceptAll,
RejectAll,
Validate,
}

impl TestEndpoint {
Expand All @@ -318,6 +326,7 @@ impl TestEndpoint {
conn_events: HashMap::default(),
captured_packets: Vec::new(),
capture_inbound_packets: false,
incoming_connection_behavior: IncomingConnectionBehavior::AcceptAll,
}
}

Expand All @@ -340,9 +349,22 @@ impl TestEndpoint {
.handle(recv_time, remote, None, ecn, packet, &mut buf)
{
match event {
DatagramEvent::NewConnection(ch, conn) => {
self.connections.insert(ch, conn);
self.accepted = Some(ch);
DatagramEvent::NewConnection(incoming) => {
match self.incoming_connection_behavior {
IncomingConnectionBehavior::AcceptAll => {
let _ = self.try_accept(incoming, now);
}
IncomingConnectionBehavior::RejectAll => {
self.reject(incoming);
}
IncomingConnectionBehavior::Validate => {
if incoming.remote_address_validated() {
let _ = self.try_accept(incoming, now);
} else {
self.retry(incoming);
}
}
}
}
DatagramEvent::ConnectionEvent(ch, event) => {
if self.capture_inbound_packets {
Expand Down Expand Up @@ -418,8 +440,58 @@ impl TestEndpoint {
self.outbound.extend(self.delayed.drain(..));
}

pub(super) fn try_accept(
&mut self,
incoming: IncomingConnection,
now: Instant,
) -> Result<ConnectionHandle, ConnectionError> {
let mut buf = BytesMut::new();
self.endpoint
.accept(incoming, now, &mut buf)
.map(|(ch, conn)| {
self.connections.insert(ch, conn);
self.accepted = Some(Ok(ch));
ch
})
.map_err(|(e, transmit)| {
if let Some(transmit) = transmit {
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
}
self.accepted = Some(Err(e.clone()));
e
})
}

pub(super) fn retry(&mut self, incoming: IncomingConnection) {
let mut buf = BytesMut::new();
let transmit = self.endpoint.retry(incoming, &mut buf).unwrap();
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
}

pub(super) fn reject(&mut self, incoming: IncomingConnection) {
let mut buf = BytesMut::new();
let transmit = self.endpoint.reject(incoming, &mut buf);
let size = transmit.size;
self.outbound
.extend(split_transmit(transmit, buf.split_to(size).freeze()));
}

pub(super) fn assert_accept(&mut self) -> ConnectionHandle {
self.accepted.take().expect("server didn't connect")
self.accepted
.take()
.expect("server didn't try connecting")
.expect("server experienced error connecting")
}

pub(super) fn assert_accept_error(&mut self) -> ConnectionError {
self.accepted
.take()
.expect("server didn't try connecting")
.expect_err("server did unexpectedly connect without error")
}

pub(super) fn assert_no_accept(&self) {
Expand Down
Loading

0 comments on commit 06f1267

Please sign in to comment.