Skip to content

Commit

Permalink
refactor!: remove DisconnectionEvents
Browse files Browse the repository at this point in the history
This removes the `DisconnectionEvents` API, and all the plumbing that
makes it work. This is part of the move to a fully connection-oriented
API. In that case, callers would always be working with a handle to a
connection, and so will see if disconnections occur.

BREAKING CHANGE: `DisconnectionEvents` has been removed, and is no
longer returned from `Endpoint::new` or `Endpoint::new_client`.
  • Loading branch information
Chris Connelly authored and joshuef committed Oct 15, 2021
1 parent 1b7c000 commit 93e70a7
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 158 deletions.
19 changes: 9 additions & 10 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@ async fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();

// create an endpoint for us to listen on and send from.
let (node, _incoming_conns, mut incoming_messages, _disconnections, _contact) =
Endpoint::<XId>::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
&[],
Config {
idle_timeout: Duration::from_secs(60 * 60).into(), // 1 hour idle timeout.
..Default::default()
},
)
.await?;
let (node, _incoming_conns, mut incoming_messages, _contact) = Endpoint::<XId>::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
&[],
Config {
idle_timeout: Duration::from_secs(60 * 60).into(), // 1 hour idle timeout.
..Default::default()
},
)
.await?;

// if we received args then we parse them as SocketAddr and send a "marco" msg to each peer.
if args.len() > 1 {
Expand Down
21 changes: 2 additions & 19 deletions src/connection_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::{
};
use bytes::Bytes;
use futures::stream::StreamExt;
use std::{fmt::Debug, net::SocketAddr, sync::Arc};
use tokio::sync::mpsc::{Receiver, Sender};
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::mpsc::Sender;
use tracing::{trace, warn};

/// A connection between two [`Endpoint`]s.
Expand All @@ -38,18 +38,6 @@ pub struct ConnectionHandle<I: ConnId> {
remover: ConnectionRemover<I>,
}

/// Disconnection events, and the result that led to disconnection.
#[derive(Debug)]
pub struct DisconnectionEvents(pub Receiver<SocketAddr>);

/// Disconnection
impl DisconnectionEvents {
/// Blocks until there is a disconnection event and returns the address of the disconnected peer
pub async fn next(&mut self) -> Option<SocketAddr> {
self.0.recv().await
}
}

impl<I: ConnId> ConnectionHandle<I> {
pub(crate) fn new(
inner: Connection,
Expand Down Expand Up @@ -118,13 +106,11 @@ impl<I: ConnId> ConnectionHandle<I> {
}
}

#[allow(clippy::too_many_arguments)] // this will be removed soon, so let is pass for now
pub(super) fn listen_for_incoming_connections<I: ConnId>(
mut quinn_incoming: quinn::Incoming,
connection_pool: ConnectionPool<I>,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
connection_tx: Sender<ConnectionHandle<I>>,
disconnection_tx: Sender<SocketAddr>,
endpoint: Endpoint<I>,
quic_endpoint: quinn::Endpoint,
retry_config: Arc<RetryConfig>,
Expand All @@ -151,7 +137,6 @@ pub(super) fn listen_for_incoming_connections<I: ConnId>(
connection,
connection_incoming,
message_tx.clone(),
disconnection_tx.clone(),
);
}
Err(err) => {
Expand All @@ -171,7 +156,6 @@ pub(super) fn listen_for_incoming_messages<I: ConnId>(
connection: ConnectionHandle<I>,
mut connection_incoming: ConnectionIncoming,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
disconnection_tx: Sender<SocketAddr>,
) {
let _ = tokio::spawn(async move {
let src = connection.remote_address();
Expand All @@ -191,7 +175,6 @@ pub(super) fn listen_for_incoming_messages<I: ConnId>(
}

connection.remover.remove().await;
let _ = disconnection_tx.send(src).await;

trace!("The connection to {} has terminated", src);
});
Expand Down
25 changes: 6 additions & 19 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use super::{
connection_deduplicator::{ConnectionDeduplicator, DedupHandle},
connection_handle::{
listen_for_incoming_connections, listen_for_incoming_messages, ConnectionHandle,
DisconnectionEvents,
},
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
error::{
Expand Down Expand Up @@ -80,7 +79,6 @@ pub struct Endpoint<I: ConnId> {
retry_config: Arc<RetryConfig>,

message_tx: MpscSender<(ConnectionHandle<I>, Bytes)>,
disconnection_tx: MpscSender<SocketAddr>,
termination_tx: Sender<()>,
connection_pool: ConnectionPool<I>,
connection_deduplicator: ConnectionDeduplicator,
Expand Down Expand Up @@ -133,7 +131,6 @@ impl<I: ConnId> Endpoint<I> {
Self,
IncomingConnections<I>,
IncomingMessages<I>,
DisconnectionEvents,
Option<ConnectionHandle<I>>,
),
EndpointError,
Expand Down Expand Up @@ -175,7 +172,6 @@ impl<I: ConnId> Endpoint<I> {
endpoint.connection_pool.clone(),
channels.message.0.clone(),
channels.connection.0,
channels.disconnection.0.clone(),
endpoint.clone(),
endpoint.quic_endpoint.clone(),
endpoint.retry_config.clone(),
Expand All @@ -198,7 +194,6 @@ impl<I: ConnId> Endpoint<I> {
endpoint,
IncomingConnections(channels.connection.1),
IncomingMessages(channels.message.1),
DisconnectionEvents(channels.disconnection.1),
contact,
))
}
Expand All @@ -211,7 +206,7 @@ impl<I: ConnId> Endpoint<I> {
pub fn new_client(
local_addr: impl Into<SocketAddr>,
config: Config,
) -> Result<(Self, IncomingMessages<I>, DisconnectionEvents), ClientEndpointError> {
) -> Result<(Self, IncomingMessages<I>), ClientEndpointError> {
let config = InternalConfig::try_from_config(config)?;

let (endpoint, _, channels) = Self::build_endpoint(
Expand All @@ -221,11 +216,7 @@ impl<I: ConnId> Endpoint<I> {
quinn::Endpoint::builder(),
)?;

Ok((
endpoint,
IncomingMessages(channels.message.1),
DisconnectionEvents(channels.disconnection.1),
))
Ok((endpoint, IncomingMessages(channels.message.1)))
}

// A private helper for initialising an endpoint.
Expand All @@ -249,7 +240,6 @@ impl<I: ConnId> Endpoint<I> {
quic_endpoint,
retry_config,
message_tx: channels.message.0.clone(),
disconnection_tx: channels.disconnection.0.clone(),
termination_tx: channels.termination.0.clone(),
connection_pool: ConnectionPool::new(),
connection_deduplicator: ConnectionDeduplicator::new(),
Expand Down Expand Up @@ -482,7 +472,6 @@ impl<I: ConnId> Endpoint<I> {
connection.clone(),
connection_incoming,
self.message_tx.clone(),
self.disconnection_tx.clone(),
);

let _ = completion.complete(Ok(()));
Expand Down Expand Up @@ -657,7 +646,6 @@ struct Channels<I: ConnId> {
MpscReceiver<ConnectionHandle<I>>,
),
message: (MpscSender<Msg<I>>, MpscReceiver<Msg<I>>),
disconnection: (MpscSender<SocketAddr>, MpscReceiver<SocketAddr>),
termination: (Sender<()>, broadcast::Receiver<()>),
}

Expand All @@ -666,7 +654,6 @@ impl<I: ConnId> Channels<I> {
Self {
connection: mpsc::channel(STANDARD_CHANNEL_SIZE),
message: mpsc::channel(STANDARD_CHANNEL_SIZE),
disconnection: mpsc::channel(STANDARD_CHANNEL_SIZE),
termination: broadcast::channel(1),
}
}
Expand All @@ -681,7 +668,7 @@ mod tests {

#[tokio::test]
async fn new_without_external_addr() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
let (endpoint, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
Expand All @@ -698,7 +685,7 @@ mod tests {

#[tokio::test]
async fn new_with_external_ip() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
let (endpoint, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
Expand All @@ -718,7 +705,7 @@ mod tests {

#[tokio::test]
async fn new_with_external_port() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
let (endpoint, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
Expand All @@ -738,7 +725,7 @@ mod tests {

#[tokio::test]
async fn new_with_external_addr() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
let (endpoint, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod wire_msg;

pub use config::{Config, ConfigError, RetryConfig};
pub use connection::{RecvStream, SendStream};
pub use connection_handle::{ConnectionHandle as Connection, DisconnectionEvents};
pub use connection_handle::ConnectionHandle as Connection;
pub use connection_pool::ConnId;
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
#[cfg(feature = "igd")]
Expand Down
Loading

0 comments on commit 93e70a7

Please sign in to comment.