Skip to content

Commit

Permalink
refactor: use remote address from connection handle
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Nov 19, 2020
1 parent 8ca43c4 commit 3f518ce
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
4 changes: 4 additions & 0 deletions src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl ConnectionRemover {
let mut store = self.store.lock().unwrap_or_else(PoisonError::into_inner);
let _ = store.map.remove(&self.key);
}

pub fn remote_addr(&self) -> &SocketAddr {
&self.key.addr
}
}

#[derive(Default)]
Expand Down
20 changes: 6 additions & 14 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,9 @@ impl IncomingConnections {
}) => {
let pool_handle = self
.connection_pool
.insert(connection.remote_address(), connection.clone());

Some(IncomingMessages::new(
connection.remote_address(),
uni_streams,
bi_streams,
pool_handle,
))
.insert(connection.remote_address(), connection);

Some(IncomingMessages::new(uni_streams, bi_streams, pool_handle))
}
Err(_err) => None,
},
Expand All @@ -168,21 +163,18 @@ impl IncomingConnections {

/// Stream of incoming QUIC messages
pub struct IncomingMessages {
peer_addr: SocketAddr,
uni_streams: quinn::IncomingUniStreams,
bi_streams: quinn::IncomingBiStreams,
remover: ConnectionRemover,
}

impl IncomingMessages {
pub(crate) fn new(
peer_addr: SocketAddr,
uni_streams: quinn::IncomingUniStreams,
bi_streams: quinn::IncomingBiStreams,
remover: ConnectionRemover,
) -> Self {
Self {
peer_addr,
uni_streams,
bi_streams,
remover,
Expand All @@ -191,23 +183,23 @@ impl IncomingMessages {

/// Returns the address of the peer who initiated the connection
pub fn remote_addr(&self) -> SocketAddr {
self.peer_addr
*self.pool_handle.remote_addr()
}

/// Returns next message sent by the peer on current QUIC connection,
/// either received through a bi-directional or uni-directional stream.
pub async fn next(&mut self) -> Option<Message> {
// Each stream initiated by the remote peer constitutes a new message.
// Read the next message available in any of the two type of streams.
let src = self.peer_addr;
let src = self.remote_addr();
select! {
next_uni = Self::next_on_uni_streams(&mut self.uni_streams) =>
next_uni.map(|(bytes, recv)| Message::UniStream {
bytes,
src,
recv: RecvStream::new(recv)
}),
next_bi = Self::next_on_bi_streams(&mut self.bi_streams, self.peer_addr) =>
next_bi = Self::next_on_bi_streams(&mut self.bi_streams, src) =>
next_bi.map(|(bytes, send, recv)| Message::BiStream {
bytes,
src,
Expand Down
13 changes: 6 additions & 7 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ impl Endpoint {
}
}

/// Connects to another peer
/// Connects to another peer.
///
///
pub async fn connect_to(
&self,
Expand All @@ -172,14 +173,12 @@ impl Endpoint {

trace!("Successfully connected to peer: {}", node_addr);

let guard = self.connection_pool.insert(
new_conn.connection.remote_address(),
new_conn.connection.clone(),
);
let guard = self
.connection_pool
.insert(*node_addr, new_conn.connection.clone());

let conn = Connection::new(new_conn.connection, guard.clone());
let incoming_msgs =
IncomingMessages::new(*node_addr, new_conn.uni_streams, new_conn.bi_streams, guard);
let incoming_msgs = IncomingMessages::new(new_conn.uni_streams, new_conn.bi_streams, guard);

Ok((conn, Some(incoming_msgs)))
}
Expand Down

0 comments on commit 3f518ce

Please sign in to comment.