Skip to content

Commit

Permalink
feat!: do not close quinn::Connection on Connection drop
Browse files Browse the repository at this point in the history
Dropping `Connection` does not necessarily drop the underlying `quinn::Connection` now as it might be pooled. Calling `close` would  close it prematurely.

BREAKING CHANGE: Dropping `Connection` while some send/recv streams are still in scope no longer closes the connection. All those streams must be dropped too before the connection is closed.
  • Loading branch information
madadam committed Nov 19, 2020
1 parent 4c4e594 commit 1e5ee89
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 20 deletions.
6 changes: 0 additions & 6 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ pub struct Connection {
remover: ConnectionRemover,
}

impl Drop for Connection {
fn drop(&mut self) {
self.close();
}
}

impl Connection {
pub(crate) fn new(quic_conn: quinn::Connection, remover: ConnectionRemover) -> Self {
Self { quic_conn, remover }
Expand Down
114 changes: 100 additions & 14 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,40 +188,48 @@ async fn reuse_outgoing_connection() -> Result<()> {
let bob_addr = bob.socket_addr().await?;
let mut bob_incoming_conns = bob.listen();

// Connect for the first time and send a message.
let (conn0, incoming_messages) = alice.connect_to(&bob_addr).await?;
assert!(incoming_messages.is_some());
// Connect for the first time, send a message and then drop the connection.
let (alice_conn0, alice_incoming_messages0) = alice.connect_to(&bob_addr).await?;
assert!(alice_incoming_messages0.is_some());
let msg0 = random_msg();
conn0.send_uni(msg0.clone()).await?;
alice_conn0.send_uni(msg0.clone()).await?;
drop(alice_conn0);

// Connect for the second time and send another message. This should reuse the previously
// established connection.
let (conn1, incoming_messages) = alice.connect_to(&bob_addr).await?;
assert!(incoming_messages.is_none());
// Connect for the second time and send another message. This reuses the previously established
// connection which is kept in the pool because `incoming_messages` is still in scope.
let (alice_conn1, alice_incoming_messages1) = alice.connect_to(&bob_addr).await?;
assert!(alice_incoming_messages1.is_none());
let msg1 = random_msg();
conn1.send_uni(msg1.clone()).await?;
alice_conn1.send_uni(msg1.clone()).await?;

// Both messages should be received on the same stream.
let mut incoming_messages = bob_incoming_conns
let mut bob_incoming_messages = bob_incoming_conns
.next()
.await
.ok_or_else(|| Error::Unexpected("no incoming connection".to_string()))?;

assert_eq!(
incoming_messages
bob_incoming_messages
.next()
.await
.map(|msg| msg.get_message_data()),
Some(msg0)
);
assert_eq!(
incoming_messages
bob_incoming_messages
.next()
.await
.map(|msg| msg.get_message_data()),
Some(msg1)
);

// Drop the connection and the stream to remove the connection from the pool and close it.
drop(alice_conn1);
drop(alice_incoming_messages0);

// The connection is closed on Bob's side too.
assert!(bob_incoming_messages.next().await.is_none());

Ok(())
}

Expand Down Expand Up @@ -300,8 +308,7 @@ async fn remove_closed_connection_from_pool() -> Result<()> {
.map(|msg| msg.get_message_data()),
Some(msg0)
);
// ..and closes the stream. This removes the connection from Bob's pool and because there are
// no other live references to the connection it gets dropped...
// ..and closes the stream. This removes the connection from Bob's pool and closes it.
drop(bob_incoming_messages);

// ...which closes it on Alice's side too.
Expand Down Expand Up @@ -334,3 +341,82 @@ async fn remove_closed_connection_from_pool() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
// If both peers call `connect_to` simultaneously (that is, before any of them receives the
// others connection first), two separate connections are created. This test verifies that
// everything still works correctly even in this case.

let qp2p = new_qp2p();
let alice = qp2p.new_endpoint()?;
let alice_addr = alice.socket_addr().await?;
let mut alice_incoming_conns = alice.listen();

let bob = qp2p.new_endpoint()?;
let bob_addr = bob.socket_addr().await?;
let mut bob_incoming_conns = bob.listen();

let (alice_conn, alice_incoming_messages0) = alice.connect_to(&bob_addr).await?;
let mut alice_incoming_messages0 = alice_incoming_messages0.expect("no incoming messages");

let (bob_conn, bob_incoming_messages0) = bob.connect_to(&alice_addr).await?;
assert!(bob_incoming_messages0.is_some());

let mut alice_incoming_messages1 = alice_incoming_conns
.next()
.await
.ok_or_else(|| Error::Unexpected("no incoming connection".to_string()))?;

let mut bob_incoming_messages1 = bob_incoming_conns
.next()
.await
.ok_or_else(|| Error::Unexpected("no incoming connection".to_string()))?;

let msg0 = random_msg();
alice_conn.send_uni(msg0.clone()).await?;

let msg1 = random_msg();
bob_conn.send_uni(msg1.clone()).await?;

assert_eq!(
bob_incoming_messages1
.next()
.await
.map(|msg| msg.get_message_data()),
Some(msg0)
);

assert_eq!(
alice_incoming_messages1
.next()
.await
.map(|msg| msg.get_message_data()),
Some(msg1)
);

// Drop the connection initiated by Bob.
drop(bob_conn);
drop(bob_incoming_messages0);

// It should be closed on Alice's side too.
assert!(alice_incoming_messages1.next().await.is_none());

// Bob connects to Alice again. This does not open a new connection but returns the connection
// previously initiated by Alice from the pool.
let (bob_conn, bob_incoming_messages2) = bob.connect_to(&alice_addr).await?;
assert!(bob_incoming_messages2.is_none());

let msg2 = random_msg();
bob_conn.send_uni(msg2.clone()).await?;

assert_eq!(
alice_incoming_messages0
.next()
.await
.map(|msg| msg.get_message_data()),
Some(msg2)
);

Ok(())
}

0 comments on commit 1e5ee89

Please sign in to comment.