Skip to content

Commit

Permalink
feat!: implement connection pooling
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
    - `Endpoint::connect_to` now returns pair or `(Connection, Option<IncomingMessages>)` (previously it returned only `Connection`).
    - `Connection::open_bi_stream` renamed to `open_bi` (same as in quinn)
    - `Connection::send` renamed to `send_bi` for consistency
    - `Endpoint::listen` no longer returns `Result`
  • Loading branch information
madadam committed Nov 19, 2020
1 parent c91567d commit 6edb290
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 99 deletions.
6 changes: 3 additions & 3 deletions examples/echo_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() -> Result<(), Error> {
println!("Process running at: {}", &socket_addr);
if genesis {
println!("Waiting for connections");
let mut incoming = endpoint.listen()?;
let mut incoming = endpoint.listen();
let mut messages = incoming
.next()
.await
Expand Down Expand Up @@ -63,8 +63,8 @@ async fn main() -> Result<(), Error> {
} else {
println!("Echo service complete");
let node_addr = bootstrap_nodes[0];
let connection = endpoint.connect_to(&node_addr).await?;
let (mut send, mut recv) = connection.open_bi_stream().await?;
let (connection, _) = endpoint.connect_to(&node_addr).await?;
let (mut send, mut recv) = connection.open_bi().await?;
loop {
println!("Enter message:");
let mut input = String::new();
Expand Down
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ async fn new_connection_to(
bootstrap_nodes,
qp2p_config,
)?;
let connection = endpoint.connect_to(node_addr).await?;
let (connection, _) = endpoint.connect_to(node_addr).await?;

Ok((endpoint, connection))
}
Expand Down
112 changes: 74 additions & 38 deletions src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,72 +7,108 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use slotmap::{DefaultKey, DenseSlotMap};
use std::{
collections::{hash_map::Entry, HashMap},
collections::BTreeMap,
net::SocketAddr,
sync::{Arc, RwLock},
sync::{Arc, Mutex, PoisonError},
};

// Pool for keeping open connections. Pooled connections are associated with a `ConnectionRemover`
// which can be used to remove them from the pool.
#[derive(Clone)]
pub(crate) struct ConnectionPool {
store: Arc<RwLock<Store>>,
store: Arc<Mutex<Store>>,
}

impl ConnectionPool {
pub fn new() -> Self {
Self {
store: Arc::new(RwLock::new(Store {
connections: DenseSlotMap::new(),
keys: HashMap::new(),
})),
store: Arc::new(Mutex::new(Store::default())),
}
}

pub fn insert(&self, conn: quinn::Connection) -> Handle {
let addr = conn.remote_address();
pub fn insert(&self, addr: SocketAddr, conn: quinn::Connection) -> ConnectionRemover {
let mut store = self.store.lock().unwrap_or_else(PoisonError::into_inner);

let mut store = self.store.write().expect("RwLock poisoned");
let key = store.connections.insert(conn);
let _ = store.keys.insert(addr, key);
let key = Key {
addr,
id: store.id_gen.next(),
};
let _ = store.map.insert(key, conn);

Handle {
ConnectionRemover {
store: self.store.clone(),
key,
}
}

pub fn get(&self, addr: &SocketAddr) -> Option<quinn::Connection> {
let store = self.store.read().ok()?;
let key = store.keys.get(addr)?;
store.connections.get(*key).cloned()
pub fn get(&self, addr: &SocketAddr) -> Option<(quinn::Connection, ConnectionRemover)> {
let mut store = self.store.lock().unwrap_or_else(PoisonError::into_inner);

// Efficiently fetch the first entry whose key is equal to `key`.
let (key, conn) = store
.map
.range_mut(Key::min(*addr)..=Key::max(*addr))
.next()?;

let conn = conn.clone();
let remover = ConnectionRemover {
store: self.store.clone(),
key: *key,
};

Some((conn, remover))
}
}

pub(crate) struct Handle {
store: Arc<RwLock<Store>>,
key: DefaultKey,
// Handle for removing a connection from the pool.
#[derive(Clone)]
pub(crate) struct ConnectionRemover {
store: Arc<Mutex<Store>>,
key: Key,
}

impl Drop for Handle {
fn drop(&mut self) {
let mut store = if let Ok(store) = self.store.write() {
store
} else {
return;
};

if let Some(conn) = store.connections.remove(self.key) {
if let Entry::Occupied(entry) = store.keys.entry(conn.remote_address()) {
if entry.get() == &self.key {
let _ = entry.remove();
}
}
}
impl ConnectionRemover {
// Remove the connection from the pool.
pub fn remove(&self) {
let mut store = self.store.lock().unwrap_or_else(PoisonError::into_inner);
let _ = store.map.remove(&self.key);
}
}

#[derive(Default)]
struct Store {
connections: DenseSlotMap<DefaultKey, quinn::Connection>,
keys: HashMap<SocketAddr, DefaultKey>,
map: BTreeMap<Key, quinn::Connection>,
id_gen: IdGen,
}

// Unique key identifying a connection. Two connections will always have distict keys even if they
// have the same socket address.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
struct Key {
addr: SocketAddr,
id: u64,
}

impl Key {
// Returns the minimal `Key` for the given address according to its `Ord` relation.
fn min(addr: SocketAddr) -> Self {
Self { addr, id: u64::MIN }
}

// Returns the maximal `Key` for the given address according to its `Ord` relation.
fn max(addr: SocketAddr) -> Self {
Self { addr, id: u64::MAX }
}
}

#[derive(Default)]
struct IdGen(u64);

impl IdGen {
fn next(&mut self) -> u64 {
let id = self.0;
self.0 = self.0.wrapping_add(1);
id
}
}
72 changes: 54 additions & 18 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use super::{
api::Message,
connection_pool::{ConnectionPool, ConnectionRemover},
error::{Error, Result},
wire_msg::WireMsg,
};
Expand All @@ -21,6 +22,7 @@ use tokio::select;
/// Connection instance to a node which can be used to send messages to it
pub struct Connection {
quic_conn: quinn::Connection,
remover: ConnectionRemover,
}

impl Drop for Connection {
Expand All @@ -30,8 +32,8 @@ impl Drop for Connection {
}

impl Connection {
pub(crate) async fn new(quic_conn: quinn::Connection) -> Result<Self> {
Ok(Self { quic_conn })
pub(crate) fn new(quic_conn: quinn::Connection, remover: ConnectionRemover) -> Self {
Self { quic_conn, remover }
}

/// Returns the address of the connected peer.
Expand Down Expand Up @@ -78,44 +80,62 @@ impl Connection {
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
/// let (send_stream, recv_stream) = connection.open_bi_stream().await?;
/// let (send_stream, recv_stream) = connection.open_bi().await?;
/// Ok(())
/// }
/// ```
pub async fn open_bi_stream(&self) -> Result<(SendStream, RecvStream)> {
let (send_stream, recv_stream) = self.quic_conn.open_bi().await?;
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream)> {
let (send_stream, recv_stream) = self.handle_error(self.quic_conn.open_bi().await)?;
Ok((SendStream::new(send_stream), RecvStream::new(recv_stream)))
}

/// Send message to the connected peer via a bi-directional stream.
/// This returns the streams to send additional messages / read responses sent using the same stream.
pub async fn send(&self, msg: Bytes) -> Result<(SendStream, RecvStream)> {
let (mut send_stream, recv_stream) = self.open_bi_stream().await?;
send_stream.send_user_msg(msg).await?;
pub async fn send_bi(&self, msg: Bytes) -> Result<(SendStream, RecvStream)> {
let (mut send_stream, recv_stream) = self.open_bi().await?;
self.handle_error(send_stream.send_user_msg(msg).await)?;
Ok((send_stream, recv_stream))
}

/// Send message to peer using a uni-directional stream.
pub async fn send_uni(&self, msg: Bytes) -> Result<()> {
let mut send_stream = self.quic_conn.open_uni().await?;
send_msg(&mut send_stream, msg).await?;
send_stream.finish().await.map_err(Error::from)
let mut send_stream = self.handle_error(self.quic_conn.open_uni().await)?;
self.handle_error(send_msg(&mut send_stream, msg).await)?;
self.handle_error(send_stream.finish().await)
.map_err(Error::from)
}

/// Gracefully close connection immediatelly
pub fn close(&self) {
self.quic_conn.close(0u32.into(), b"");
// TODO: uncomment
// self.remover.remove();
}

fn handle_error<T, E>(&self, result: Result<T, E>) -> Result<T, E> {
if result.is_err() {
self.remover.remove()
}

result
}
}

/// Stream of incoming QUIC connections
pub struct IncomingConnections {
quinn_incoming: Arc<Mutex<quinn::Incoming>>,
connection_pool: ConnectionPool,
}

impl IncomingConnections {
pub(crate) fn new(quinn_incoming: Arc<Mutex<quinn::Incoming>>) -> Result<Self> {
Ok(Self { quinn_incoming })
pub(crate) fn new(
quinn_incoming: Arc<Mutex<quinn::Incoming>>,
connection_pool: ConnectionPool,
) -> Self {
Self {
quinn_incoming,
connection_pool,
}
}

/// Returns next QUIC connection established by a peer
Expand All @@ -127,11 +147,18 @@ impl IncomingConnections {
uni_streams,
bi_streams,
..
}) => Some(IncomingMessages::new(
connection.remote_address(),
uni_streams,
bi_streams,
)),
}) => {
let pool_handle = self
.connection_pool
.insert(connection.remote_address(), connection.clone());

Some(IncomingMessages::new(
connection.remote_address(),
uni_streams,
bi_streams,
pool_handle,
))
}
Err(_err) => None,
},
None => None,
Expand All @@ -144,18 +171,21 @@ pub struct IncomingMessages {
peer_addr: SocketAddr,
uni_streams: quinn::IncomingUniStreams,
bi_streams: quinn::IncomingBiStreams,
remover: ConnectionRemover,
}

impl IncomingMessages {
pub(crate) fn new(
peer_addr: SocketAddr,
uni_streams: quinn::IncomingUniStreams,
bi_streams: quinn::IncomingBiStreams,
remover: ConnectionRemover,
) -> Self {
Self {
peer_addr,
uni_streams,
bi_streams,
remover,
}
}

Expand Down Expand Up @@ -250,6 +280,12 @@ impl IncomingMessages {
}
}

impl Drop for IncomingMessages {
fn drop(&mut self) {
self.remover.remove()
}
}

/// Stream to receive multiple messages
pub struct RecvStream {
pub(crate) quinn_recv_stream: quinn::RecvStream,
Expand Down
Loading

0 comments on commit 6edb290

Please sign in to comment.