Skip to content

Commit

Permalink
fix(all): remove FIFO queues and use the mpsc channels directly
Browse files Browse the repository at this point in the history
- split out the recievers out of the endpoint object
- fix all tests and doc tests to work with the new API
  • Loading branch information
lionel-faber committed Feb 12, 2021
1 parent 591ebf8 commit 2bab054
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 290 deletions.
61 changes: 34 additions & 27 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
bootstrap_cache::BootstrapCache,
config::{Config, SerialisableCertificate},
connections::{RecvStream, SendStream},
endpoint::Endpoint,
endpoint::{DisconnectionEvents, Endpoint, IncomingConnections, IncomingMessages},
error::{Error, Result},
peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
};
Expand Down Expand Up @@ -76,19 +76,6 @@ pub struct QuicP2p {
}

impl QuicP2p {
/// Construct `QuicP2p` with the default config and bootstrap cache enabled
///
/// # Example
///
/// ```
/// use qp2p::QuicP2p;
///
/// let quic_p2p = QuicP2p::new().expect("Error initializing QuicP2p");
/// ```
pub fn new() -> Result<Self> {
Self::with_config(None, Default::default(), true)
}

/// Construct `QuicP2p` with supplied parameters, ready to be used.
/// If config is not specified it'll call `Config::read_or_construct_default()`
///
Expand All @@ -104,8 +91,8 @@ impl QuicP2p {
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
///
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// config.port = Some(3000);
/// config.local_ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// config.local_port = Some(3000);
/// let hcc = &["127.0.0.1:8080".parse().unwrap()];
/// let quic_p2p = QuicP2p::with_config(Some(config), hcc, true).expect("Error initializing QuicP2p");
/// ```
Expand Down Expand Up @@ -217,20 +204,28 @@ impl QuicP2p {
/// async fn main() -> Result<(), Error> {
///
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// config.port = Some(3000);
/// config.local_ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// config.local_port = Some(3000);
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut endpoint = quic_p2p.new_endpoint().await?;
/// let (mut endpoint, _, _, _) = quic_p2p.new_endpoint().await?;
/// let peer_addr = endpoint.socket_addr().await?;
///
/// config.port = Some(3001);
/// config.local_port = Some(3001);
/// let mut quic_p2p = QuicP2p::with_config(Some(config), &[peer_addr], true)?;
/// let (endpoint, connection, incoming_messages) = quic_p2p.bootstrap().await?;
/// let endpoint = quic_p2p.bootstrap().await?;
/// Ok(())
/// }
/// ```
pub async fn bootstrap(&self) -> Result<Endpoint> {
let endpoint = self.new_endpoint().await?;
pub async fn bootstrap(
&self,
) -> Result<(
Endpoint,
IncomingConnections,
IncomingMessages,
DisconnectionEvents,
)> {
let (endpoint, incoming_connections, incoming_message, disconnections) =
self.new_endpoint().await?;

trace!("Bootstrapping with nodes {:?}", endpoint.bootstrap_nodes());
if endpoint.bootstrap_nodes().is_empty() {
Expand All @@ -251,7 +246,12 @@ impl QuicP2p {
})?
.0;

Ok(endpoint)
Ok((
endpoint,
incoming_connections,
incoming_message,
disconnections,
))
}

/// Create a new `Endpoint` which can be used to connect to peers and send
Expand All @@ -266,13 +266,20 @@ impl QuicP2p {
/// async fn main() -> Result<(), Error> {
///
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// config.local_ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let endpoint = quic_p2p.new_endpoint().await?;
/// let (endpoint, incoming_connections, incoming_messages, disconnections) = quic_p2p.new_endpoint().await?;
/// Ok(())
/// }
/// ```
pub async fn new_endpoint(&self) -> Result<Endpoint> {
pub async fn new_endpoint(
&self,
) -> Result<(
Endpoint,
IncomingConnections,
IncomingMessages,
DisconnectionEvents,
)> {
trace!("Creating a new endpoint");

let bootstrap_nodes: Vec<SocketAddr> = self
Expand Down
22 changes: 10 additions & 12 deletions src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,20 @@ impl ConnectionPool {
.is_some()
}

pub fn remove(&mut self, addr: &SocketAddr) -> Option<(quinn::Connection, ConnectionRemover)> {
pub fn remove(&mut self, addr: &SocketAddr) -> Vec<quinn::Connection> {
let mut store = self.store.lock().unwrap_or_else(PoisonError::into_inner);

let key = store
let keys_to_remove = store
.map
.range_mut(Key::min(*addr)..=Key::max(*addr))
.next()
.map(|(key, _)| key.clone())?;

let conn = store.map.remove(&key)?;
let remover = ConnectionRemover {
store: self.store.clone(),
key,
};

Some((conn, remover))
.into_iter()
.map(|(key, _)| key.clone())
.collect::<Vec<_>>();

keys_to_remove
.iter()
.filter_map(|key| store.map.remove(key))
.collect::<Vec<_>>()
}

pub fn get(&self, addr: &SocketAddr) -> Option<(quinn::Connection, ConnectionRemover)> {
Expand Down
96 changes: 19 additions & 77 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use super::{
wire_msg::WireMsg,
};
use bytes::Bytes;
use futures::{lock::Mutex, stream::StreamExt};
use futures::stream::StreamExt;
use log::{error, trace};
use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use tokio::select;
use tokio::sync::mpsc::UnboundedSender;

Expand All @@ -32,67 +32,11 @@ impl Connection {
Self { quic_conn, remover }
}

/// Returns the address of the connected peer.
///
/// # Example
///
/// ```
/// use qp2p::{QuicP2p, Config, Error};
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
///
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint().await?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
/// assert_eq!(connection.remote_address(), peer1_addr);
/// Ok(())
/// }
/// ```
pub fn remote_address(&self) -> SocketAddr {
self.quic_conn.remote_address()
}

/// Get connection streams for reading/writing
///
/// # Example
///
/// ```
/// use qp2p::{QuicP2p, Config, Error};
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
///
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint().await?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
/// let (send_stream, recv_stream) = connection.open_bi().await?;
/// Ok(())
/// }
/// ```
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream)> {
let (send_stream, recv_stream) = self.handle_error(self.quic_conn.open_bi().await)?;
Ok((SendStream::new(send_stream), RecvStream::new(recv_stream)))
}

/// Send message to the connected peer via a bi-directional stream.
/// This returns the streams to send additional messages / read responses sent using the same stream.
pub async fn send_bi(&self, msg: Bytes) -> Result<(SendStream, RecvStream)> {
let (mut send_stream, recv_stream) = self.open_bi().await?;
self.handle_error(send_stream.send_user_msg(msg).await)?;
Ok((send_stream, recv_stream))
}

/// Send message to peer using a uni-directional stream.
pub async fn send_uni(&self, msg: Bytes) -> Result<()> {
let mut send_stream = self.handle_error(self.quic_conn.open_uni().await)?;
Expand All @@ -110,12 +54,6 @@ impl Connection {
}
}

/// Gracefully close connection immediatelly
pub fn close(&self) {
self.quic_conn.close(0_u32.into(), b"");
self.remover.remove();
}

fn handle_error<T, E>(&self, result: Result<T, E>) -> Result<T, E> {
if result.is_err() {
self.remover.remove()
Expand All @@ -126,7 +64,7 @@ impl Connection {
}

/// Stream to receive multiple messages
pub(crate) struct RecvStream {
pub struct RecvStream {
pub(crate) quinn_recv_stream: quinn::RecvStream,
}

Expand All @@ -152,7 +90,7 @@ impl std::fmt::Debug for RecvStream {
}

/// Stream of outgoing messages
pub(crate) struct SendStream {
pub struct SendStream {
pub(crate) quinn_send_stream: quinn::SendStream,
}

Expand Down Expand Up @@ -364,11 +302,11 @@ async fn next_on_bi_streams(
mod tests {
use anyhow::anyhow;

use crate::{Error, config::Config, tests::get_incoming_connection, wire_msg::WireMsg};
use crate::api::QuicP2p;
use crate::{config::Config, wire_msg::WireMsg, Error};
use std::net::{IpAddr, Ipv4Addr};

#[tokio::test]
#[tokio::test(core_threads = 10)]
async fn echo_service() -> Result<(), Error> {
let qp2p = QuicP2p::with_config(
Some(Config {
Expand All @@ -379,24 +317,28 @@ mod tests {
Default::default(),
false,
)?;

// Create Endpoint
let mut peer1 = qp2p.new_endpoint().await?;
let (mut peer1, mut peer1_connections, _, _) = qp2p.new_endpoint().await?;
let peer1_addr = peer1.socket_addr().await?;
let mut peer2 = qp2p.new_endpoint().await?;

let (mut peer2, _, _, _) = qp2p.new_endpoint().await?;
let peer2_addr = peer2.socket_addr().await?;

peer2.connect_to(&peer1_addr).await?;

if let Some(connecting_peer) = get_incoming_connection(&mut peer1).await {
if let Some(connecting_peer) = peer1_connections.next().await {
assert_eq!(connecting_peer, peer2_addr);
}

let connection = peer1.get_connection(&peer2_addr).ok_or_else(|| Error::MissingConnection)?;

let connection = peer1
.get_connection(&peer2_addr)
.ok_or_else(|| Error::MissingConnection)?;
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
let message = WireMsg::EndpointEchoReq;
message.write_to_stream(&mut send_stream.quinn_send_stream).await?;
message
.write_to_stream(&mut send_stream.quinn_send_stream)
.await?;
let message = WireMsg::read_from_stream(&mut recv_stream.quinn_recv_stream).await?;
if let WireMsg::EndpointEchoResp(addr) = message {
assert_eq!(addr, peer1_addr);
Expand All @@ -405,4 +347,4 @@ mod tests {
}
Ok(())
}
}
}
Loading

0 comments on commit 2bab054

Please sign in to comment.