Skip to content

Commit

Permalink
Fix race condition between 0-RTT and Incoming
Browse files Browse the repository at this point in the history
Closes #1820

The fix:

- Endpoint now maintains a slab with an entry for each pending Incoming
  to buffer received data.
- ConnectionIndex now maps initial DCID to that slab key immediately
  upon construction of Incoming.
- If Incoming is accepted, association is overridden with association
  with ConnectionHandle, and all buffered datagrams are fed to newly
  constructed Connection.
- If Incoming is refused/retried/ignored, or accepting errors,
  association and slab entry are cleaned up to prevent memory leak.

Additional considerations:

- The Incoming::ignore operation can no longer be implemented as just
  dropping it. To help prevent incorrect API usage, proto::Incoming is
  modified to log a warning if it is dropped without being passed to
  Endpoint::accept/refuse/retry/ignore.
- To help protect against memory exhaustion attacks, per-Incoming
  buffered data is limited to twice the receive window or 10 KB, which-
  ever is larger. Excessive packets silently dropped.

  - Does this introduce a new vulnerability to an attack in which an
    attacker could spam a server with 0-RTT packets with the same
    connection ID as it observed a client attempting to initiate a 0-RTT
    connection to the server? I do think so.

    Is this a severe problem? Here's two reasons I don't think so:

    1. The default receive window is set to max value, so this won't
       actually kick in unless the user is already hardening against
       adverse conditions.
    2. It is already possible for an on-path attacker to distrupt a
       connection handshake if 0.5-RTT data is being used, so this
       probably doesn't actually expand the set of situations in which
       it's vulnerable to this kind of vulnerability.

    Could this be avoided? Possibly by introducing additional state to
    the buffering state to validate whether these packets are validly
    encrypted for the associated connection? However, that may risk
    making these operations costly enough that they start to defeat the
    DDOS-resistance abilities of the Incoming API.
  • Loading branch information
gretchenfrage committed Apr 14, 2024
1 parent 82a67db commit cb6bab4
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 51 deletions.
8 changes: 4 additions & 4 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::{
packet::{Header, InitialHeader, InitialPacket, LongType, Packet, PartialDecode, SpaceId},
range_set::ArrayRangeSet,
shared::{
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
EndpointEventInner,
ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
EndpointEvent, EndpointEventInner,
},
token::ResetToken,
transport_parameters::TransportParameters,
Expand Down Expand Up @@ -989,13 +989,13 @@ impl Connection {
pub fn handle_event(&mut self, event: ConnectionEvent) {
use self::ConnectionEventInner::*;
match event.0 {
Datagram {
Datagram(DatagramConnectionEvent {
now,
remote,
ecn,
first_decode,
remaining,
} => {
}) => {
// If this packet could initiate a migration and we're a client or a server that
// forbids migration, drop the datagram. This could be relaxed to heuristically
// permit NAT-rebinding-like migration.
Expand Down
182 changes: 143 additions & 39 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::{
PartialDecode, PlainInitialHeader,
},
shared::{
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
EndpointEventInner, IssuedCid,
ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
EndpointEvent, EndpointEventInner, IssuedCid,
},
transport_parameters::{PreferredAddress, TransportParameters},
ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU,
Expand All @@ -50,6 +50,8 @@ pub struct Endpoint {
allow_mtud: bool,
/// Time at which a stateless reset was most recently sent
last_stateless_reset: Option<Instant>,
/// Buffered 0-rtt messages for pending incoming connections
incoming_buffers: Slab<IncomingBuffer>,
}

impl Endpoint {
Expand All @@ -73,6 +75,7 @@ impl Endpoint {
server_config,
allow_mtud,
last_stateless_reset: None,
incoming_buffers: Slab::new(),
}
}

Expand Down Expand Up @@ -188,17 +191,41 @@ impl Endpoint {
//

let addresses = FourTuple { remote, local_ip };
if let Some(ch) = self.index.get(&addresses, &first_decode) {
return Some(DatagramEvent::ConnectionEvent(
ch,
ConnectionEvent(ConnectionEventInner::Datagram {
now,
remote: addresses.remote,
ecn,
first_decode,
remaining,
}),
));
if let Some(route_to) = self.index.get(&addresses, &first_decode) {
let event = DatagramConnectionEvent {
now,
remote: addresses.remote,
ecn,
first_decode,
remaining,
};
match route_to {
RouteDatagramTo::Incoming(incoming_idx) => {
let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
let server_config = self.server_config.as_ref().unwrap();
let buffer_limit = (server_config.transport.receive_window.into_inner()
as usize)
.saturating_mul(2)
.max(10_000);

if let Some(total_bytes) = incoming_buffer
.total_bytes
.checked_add(datagram_len)
.filter(|&total_bytes| total_bytes <= buffer_limit)
{
incoming_buffer.datagrams.push(event);
incoming_buffer.total_bytes = total_bytes;
}

return None;
}
RouteDatagramTo::Connection(ch) => {
return Some(DatagramEvent::ConnectionEvent(
ch,
ConnectionEvent(ConnectionEventInner::Datagram(event)),
))
}
}
}

//
Expand Down Expand Up @@ -486,19 +513,26 @@ impl Endpoint {
}
};

Some(DatagramEvent::NewConnection(Incoming {
addresses,
ecn,
packet: InitialPacket {
header,
header_data: packet.header_data,
payload: packet.payload,
let incoming_idx = self.incoming_buffers.insert(Default::default());
self.index
.insert_initial_incoming(orig_dst_cid, incoming_idx);

Some(DatagramEvent::NewConnection(Incoming(Some(
IncomingInner {
addresses,
ecn,
packet: InitialPacket {
header,
header_data: packet.header_data,
payload: packet.payload,
},
rest,
crypto,
retry_src_cid,
orig_dst_cid,
incoming_idx,
},
rest,
crypto,
retry_src_cid,
orig_dst_cid,
}))
))))
}

/// Attempt to accept this incoming connection (an error may still occur)
Expand All @@ -509,6 +543,9 @@ impl Endpoint {
buf: &mut BytesMut,
server_config: Option<Arc<ServerConfig>>,
) -> Result<(ConnectionHandle, Connection), AcceptError> {
let mut incoming = incoming.0.take().unwrap();
let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);

let packet_number = incoming.packet.header.number.expand(0);
let InitialHeader {
src_cid,
Expand All @@ -519,6 +556,7 @@ impl Endpoint {

if self.cids_exhausted() {
debug!("refusing connection");
self.index.remove_initial_incoming(incoming.orig_dst_cid);
return Err(AcceptError {
cause: ConnectionError::CidsExhausted,
response: Some(self.initial_close(
Expand Down Expand Up @@ -547,6 +585,7 @@ impl Endpoint {
.is_err()
{
debug!(packet_number, "failed to authenticate initial packet");
self.index.remove_initial_incoming(incoming.orig_dst_cid);
return Err(AcceptError {
cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
response: None,
Expand Down Expand Up @@ -593,11 +632,12 @@ impl Endpoint {
tls,
Some(server_config),
transport_config,
incoming.remote_address_validated(),
incoming.retry_src_cid.is_some(),
);
if dst_cid.len() != 0 {
self.index.insert_initial(dst_cid, ch);
}

match conn.handle_first_packet(
now,
incoming.addresses.remote,
Expand All @@ -608,6 +648,11 @@ impl Endpoint {
) {
Ok(()) => {
trace!(id = ch.0, icid = %dst_cid, "new connection");

for event in incoming_buffer.datagrams {
conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
}

Ok((ch, conn))
}
Err(e) => {
Expand Down Expand Up @@ -660,6 +705,7 @@ impl Endpoint {

/// Reject this incoming connection attempt
pub fn refuse(&mut self, incoming: Incoming, buf: &mut BytesMut) -> Transmit {
let incoming = self.clean_up_incoming(incoming);
self.initial_close(
incoming.packet.header.version,
incoming.addresses,
Expand All @@ -681,6 +727,9 @@ impl Endpoint {
if incoming.remote_address_validated() {
return Err(RetryError(incoming));
}

let incoming = self.clean_up_incoming(incoming);

let server_config = self.server_config.as_ref().unwrap();

// First Initial
Expand Down Expand Up @@ -728,6 +777,21 @@ impl Endpoint {
})
}

/// Ignore this incoming connection attempt, not sending any packet in response
///
/// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
/// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
pub fn ignore(&mut self, incoming: Incoming) {
self.clean_up_incoming(incoming);
}

fn clean_up_incoming(&mut self, mut incoming: Incoming) -> IncomingInner {
let incoming = incoming.0.take().unwrap();
self.index.remove_initial_incoming(incoming.orig_dst_cid);
self.incoming_buffers.remove(incoming.incoming_idx);
incoming
}

fn add_connection(
&mut self,
ch: ConnectionHandle,
Expand Down Expand Up @@ -878,13 +942,27 @@ impl fmt::Debug for Endpoint {
}
}

/// Buffered 0-rtt messages for a pending incoming connection
#[derive(Default)]
struct IncomingBuffer {
datagrams: Vec<DatagramConnectionEvent>,
total_bytes: usize,
}

/// Part of protocol state incoming datagram can be routed to
#[derive(Copy, Clone, Debug)]
enum RouteDatagramTo {
Incoming(usize),
Connection(ConnectionHandle),
}

/// Maps packets to existing connections
#[derive(Default, Debug)]
struct ConnectionIndex {
/// Identifies connections based on the initial DCID the peer utilized
///
/// Uses a standard `HashMap` to protect against hash collision attacks.
connection_ids_initial: HashMap<ConnectionId, ConnectionHandle>,
connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
/// Identifies connections based on locally created CIDs
///
/// Uses a cheaper hash function since keys are locally created
Expand All @@ -901,9 +979,21 @@ struct ConnectionIndex {
}

impl ConnectionIndex {
/// Associate an incoming connection with its initial destination CID
fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
self.connection_ids_initial
.insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
}

/// Remove an association between an incoming connection and its initial destination CID
fn remove_initial_incoming(&mut self, dst_cid: ConnectionId) {
self.connection_ids_initial.remove(&dst_cid);
}

/// Associate a connection with its initial destination CID
fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
self.connection_ids_initial.insert(dst_cid, connection);
self.connection_ids_initial
.insert(dst_cid, RouteDatagramTo::Connection(connection));
}

/// Associate a connection with its first locally-chosen destination CID if used, or otherwise
Expand Down Expand Up @@ -944,10 +1034,10 @@ impl ConnectionIndex {
}

/// Find the existing connection that `datagram` should be routed to, if any
fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<ConnectionHandle> {
fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
if datagram.dst_cid().len() != 0 {
if let Some(&ch) = self.connection_ids.get(datagram.dst_cid()) {
return Some(ch);
return Some(RouteDatagramTo::Connection(ch));
}
}
if datagram.is_initial() || datagram.is_0rtt() {
Expand All @@ -957,7 +1047,7 @@ impl ConnectionIndex {
}
if datagram.dst_cid().len() == 0 {
if let Some(&ch) = self.connection_remotes.get(addresses) {
return Some(ch);
return Some(RouteDatagramTo::Connection(ch));
}
}
let data = datagram.data();
Expand All @@ -967,6 +1057,7 @@ impl ConnectionIndex {
self.connection_reset_tokens
.get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
.cloned()
.map(RouteDatagramTo::Connection)
}
}

Expand Down Expand Up @@ -1021,14 +1112,17 @@ pub enum DatagramEvent {
}

/// An incoming connection for which the server has not yet begun its part of the handshake.
pub struct Incoming {
pub struct Incoming(Option<IncomingInner>);

struct IncomingInner {
addresses: FourTuple,
ecn: Option<EcnCodepoint>,
packet: InitialPacket,
rest: Option<BytesMut>,
crypto: Keys,
retry_src_cid: Option<ConnectionId>,
orig_dst_cid: ConnectionId,
incoming_idx: usize,
}

impl Incoming {
Expand All @@ -1037,36 +1131,46 @@ impl Incoming {
///
/// This has the same behavior as [`Connection::local_ip`]
pub fn local_ip(&self) -> Option<IpAddr> {
self.addresses.local_ip
self.0.as_ref().unwrap().addresses.local_ip
}

/// The peer's UDP address.
pub fn remote_address(&self) -> SocketAddr {
self.addresses.remote
self.0.as_ref().unwrap().addresses.remote
}

/// Whether the socket address that is initiating this connection has been validated.
///
/// This means that the sender of the initial packet has proved that they can receive traffic
/// sent to `self.remote_address()`.
pub fn remote_address_validated(&self) -> bool {
self.retry_src_cid.is_some()
self.0.as_ref().unwrap().retry_src_cid.is_some()
}
}

impl fmt::Debug for Incoming {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let inner = self.0.as_ref().unwrap();
f.debug_struct("Incoming")
.field("addresses", &self.addresses)
.field("ecn", &self.ecn)
.field("addresses", &inner.addresses)
.field("ecn", &inner.ecn)
// packet doesn't implement debug
// rest is too big and not meaningful enough
.field("retry_src_cid", &self.retry_src_cid)
.field("orig_dst_cid", &self.orig_dst_cid)
.field("retry_src_cid", &inner.retry_src_cid)
.field("orig_dst_cid", &inner.orig_dst_cid)
.field("incoming_idx", &inner.incoming_idx)
.finish_non_exhaustive()
}
}

impl Drop for Incoming {
fn drop(&mut self) {
if self.0.is_some() {
warn!("quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore (may cause memory leak)");
}
}
}

/// Errors in the parameters being used to create a new connection
///
/// These arise before any I/O has been performed.
Expand Down
Loading

0 comments on commit cb6bab4

Please sign in to comment.