Skip to content

Commit

Permalink
feat: Add Endpoint::new_client constructor
Browse files Browse the repository at this point in the history
This creates an `Endpoint` that can work as a client only (all the
machinery for incoming transactions are dropped).

Another difference from `Endpoint::new` is that this does none of the
reachability checking, since this should not be necessary for client-
only endpoints (though, the API for bidirectional streams is currently
unusable). This massively reduces the possible errors, which have their
own `ClientError` enum.
  • Loading branch information
Chris Connelly authored and connec committed Aug 27, 2021
1 parent 5d374fa commit 176d025
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
41 changes: 39 additions & 2 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use super::error::Error;
use super::igd::forward_port;
use super::wire_msg::WireMsg;
use super::{
config::InternalConfig,
config::{Config, InternalConfig},
connection_deduplicator::ConnectionDeduplicator,
connection_pool::ConnectionPool,
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectionEvents, RecvStream, SendStream,
},
error::Result,
error::{ClientEndpointError, Result},
};
use bytes::Bytes;
use std::net::SocketAddr;
Expand Down Expand Up @@ -94,6 +94,43 @@ impl<I: ConnId> std::fmt::Debug for Endpoint<I> {
}

impl<I: ConnId> Endpoint<I> {
/// Create a client endpoint at the given address.
///
/// A client endpoint cannot receive incoming connections, as such they also do not need to be
/// publicly reachable. They can still communicate over outgoing connections and receive
/// incoming streams, since QUIC allows for either side of a connection to initiate streams.
pub fn new_client(
local_addr: impl Into<SocketAddr>,
config: Config,
) -> Result<Self, ClientEndpointError> {
let config = InternalConfig::try_from_config(config)?;

let local_addr = local_addr.into();
let (quic_endpoint, _) = quinn::Endpoint::builder().bind(&local_addr)?;
let local_addr = quic_endpoint
.local_addr()
.map_err(ClientEndpointError::Socket)?;

let (message_tx, _) = mpsc::channel(STANDARD_CHANNEL_SIZE);
let (disconnection_tx, _) = mpsc::channel(STANDARD_CHANNEL_SIZE);
let (termination_tx, _) = broadcast::channel(1);

let endpoint = Self {
local_addr,
public_addr: None,
quic_endpoint,
message_tx,
disconnection_tx,
bootstrap_nodes: Default::default(),
config,
termination_tx,
connection_pool: ConnectionPool::new(),
connection_deduplicator: ConnectionDeduplicator::new(),
};

Ok(endpoint)
}

pub(crate) async fn new(
quic_endpoint: quinn::Endpoint,
quic_incoming: quinn::Incoming,
Expand Down
21 changes: 21 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// Software.

use super::wire_msg::WireMsg;
use crate::config::ConfigError;
use bytes::Bytes;
use std::{fmt, io};
use thiserror::Error;
Expand Down Expand Up @@ -127,6 +128,26 @@ impl From<quinn::ConnectionError> for Error {
}
}

/// Errors returned by [`Endpoint::new_client`](crate::Endpoint::new_client).
#[derive(Debug, Error)]
pub enum ClientEndpointError {
/// There was a problem with the provided configuration.
#[error("There was a problem with the provided configuration")]
Config(#[from] ConfigError),

/// Failed to bind UDP socket.
#[error("Failed to bind UDP socket")]
Socket(#[source] io::Error),
}

impl From<quinn::EndpointError> for ClientEndpointError {
fn from(error: quinn::EndpointError) -> Self {
match error {
quinn::EndpointError::Socket(error) => Self::Socket(error),
}
}
}

/// The reason a connection was closed.
#[derive(Debug)]
pub enum Close {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub use config::{Config, ConfigError};
pub use connection_pool::ConnId;
pub use connections::{DisconnectionEvents, RecvStream, SendStream};
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
pub use error::{Close, ConnectionError, Error, Result, TransportErrorCode};
pub use error::{ClientEndpointError, Close, ConnectionError, Error, Result, TransportErrorCode};

#[cfg(test)]
mod tests;
30 changes: 30 additions & 0 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,33 @@ async fn reachability() -> Result<()> {
ep1.is_reachable(&reachable_addr).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[traced_test]
async fn client() -> Result<()> {
use crate::{Config, Endpoint};

let qp2p = new_qp2p()?;

let (server, _, mut server_messages, _) = qp2p.new_endpoint(local_addr()).await?;
let client = Endpoint::<[u8; 32]>::new_client(
local_addr(),
Config {
min_retry_duration: Some(Duration::from_millis(500)),
..Default::default()
},
)?;

client
.send_message(b"hello"[..].into(), &server.public_addr(), 0)
.await?;

let (sender, message) = server_messages
.next()
.await
.ok_or_else(|| anyhow!("Did not receive expected message"))?;
assert_eq!(sender, client.public_addr());
assert_eq!(&message[..], b"hello");

Ok(())
}

0 comments on commit 176d025

Please sign in to comment.