Skip to content

Commit

Permalink
A0-1455: Refactor for incoming protocol upgrade (#711)
Browse files Browse the repository at this point in the history
* Refactor for incoming protocol upgrade

* The type should be what is sent, not the sending mechanism
  • Loading branch information
timorl authored Nov 8, 2022
1 parent 6ec384f commit 94039f2
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 187 deletions.
10 changes: 4 additions & 6 deletions finality-aleph/src/validator_network/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use aleph_primitives::AuthorityId;
use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use log::{debug, info};

use crate::{
crypto::AuthorityPen,
validator_network::{
protocol_negotiation::{protocol, ProtocolNegotiationError},
protocols::ProtocolError,
protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService},
Data, Splittable,
},
};
Expand Down Expand Up @@ -43,7 +41,7 @@ impl From<ProtocolError> for IncomingError {
async fn manage_incoming<D: Data, S: Splittable>(
authority_pen: AuthorityPen,
stream: S,
result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>,
result_for_parent: mpsc::UnboundedSender<ResultForService<D>>,
data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), IncomingError> {
debug!(target: "validator-network", "Performing incoming protocol negotiation.");
Expand All @@ -62,7 +60,7 @@ async fn manage_incoming<D: Data, S: Splittable>(
pub async fn incoming<D: Data, S: Splittable>(
authority_pen: AuthorityPen,
stream: S,
result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>,
result_for_parent: mpsc::UnboundedSender<ResultForService<D>>,
data_for_user: mpsc::UnboundedSender<D>,
) {
let addr = stream.peer_address_info();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,10 @@ use std::{
};

use aleph_primitives::AuthorityId;
use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;

use crate::{network::PeerId, validator_network::Data};

/// Network component responsible for holding the list of peers that we
/// want to connect to, and managing the established connections.
pub struct Manager<A: Data, D: Data> {
addresses: HashMap<AuthorityId, Vec<A>>,
outgoing: HashMap<AuthorityId, mpsc::UnboundedSender<D>>,
incoming: HashMap<AuthorityId, oneshot::Sender<()>>,
}

/// Error during sending data through the Manager
#[derive(Debug, PartialEq, Eq)]
pub enum SendError {
Expand All @@ -35,6 +27,25 @@ impl Display for SendError {
}
}

/// Possible results of adding connections.
#[derive(Debug, PartialEq, Eq)]
pub enum AddResult {
/// We do not want to maintain a connection with this peer.
Uninterested,
/// Connection added.
Added,
/// Old connection replaced with new one.
Replaced,
}

/// Network component responsible for holding the list of peers that we
/// want to connect to, and managing the established connections.
pub struct Manager<A: Data, D: Data> {
addresses: HashMap<AuthorityId, Vec<A>>,
outgoing: HashMap<AuthorityId, mpsc::UnboundedSender<D>>,
incoming: HashMap<AuthorityId, mpsc::UnboundedSender<D>>,
}

struct ManagerStatus {
wanted_peers: usize,
both_ways_peers: HashSet<AuthorityId>,
Expand All @@ -48,7 +59,7 @@ impl ManagerStatus {
let incoming: HashSet<_> = manager
.incoming
.iter()
.filter(|(_, exit)| !exit.is_canceled())
.filter(|(_, exit)| !exit.is_closed())
.map(|(k, _)| k.clone())
.collect();
let outgoing: HashSet<_> = manager
Expand Down Expand Up @@ -149,17 +160,6 @@ impl Display for ManagerStatus {
}
}

/// Possible results of adding connections.
#[derive(Debug, PartialEq, Eq)]
pub enum AddResult {
/// We do not want to maintain a connection with this peer.
Uninterested,
/// Connection added.
Added,
/// Old connection replaced with new one.
Replaced,
}

impl<A: Data, D: Data> Manager<A, D> {
/// Create a new Manager with empty list of peers.
pub fn new() -> Self {
Expand Down Expand Up @@ -202,7 +202,11 @@ impl<A: Data, D: Data> Manager<A, D> {

/// Add an established incoming connection with a known peer,
/// but only if the peer is on the list of peers that we want to stay connected with.
pub fn add_incoming(&mut self, peer_id: AuthorityId, exit: oneshot::Sender<()>) -> AddResult {
pub fn add_incoming(
&mut self,
peer_id: AuthorityId,
exit: mpsc::UnboundedSender<D>,
) -> AddResult {
use AddResult::*;
if !self.addresses.contains_key(&peer_id) {
return Uninterested;
Expand Down Expand Up @@ -240,10 +244,7 @@ impl<A: Data, D: Data> Manager<A, D> {

#[cfg(test)]
mod tests {
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use futures::{channel::mpsc, StreamExt};

use super::{AddResult::*, Manager, SendError};
use crate::validator_network::mock::key;
Expand Down Expand Up @@ -319,27 +320,27 @@ mod tests {
String::from("a/b/c"),
String::from("43.43.43.43:43000"),
];
let (tx, rx) = oneshot::channel();
let (tx, mut rx) = mpsc::unbounded();
// try add unknown peer
assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested);
// rx should fail
assert!(rx.await.is_err());
assert!(rx.try_next().expect("channel should be closed").is_none());
// add peer, this time for real
assert!(manager.add_peer(peer_id.clone(), addresses.clone()));
let (tx, mut rx) = oneshot::channel();
let (tx, mut rx) = mpsc::unbounded();
// should just add
assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added);
// the exit channel should be open
assert!(rx.try_recv().is_ok());
let (tx, mut rx2) = oneshot::channel();
assert!(rx.try_next().is_err());
let (tx, mut rx2) = mpsc::unbounded();
// should replace now
assert_eq!(manager.add_incoming(peer_id.clone(), tx), Replaced);
// receiving should fail on old, but work on new channel
assert!(rx.try_recv().is_err());
assert!(rx2.try_recv().is_ok());
assert!(rx.try_next().expect("channel should be closed").is_none());
assert!(rx2.try_next().is_err());
// remove peer
manager.remove_peer(&peer_id);
// receiving should fail
assert!(rx2.try_recv().is_err());
assert!(rx2.try_next().expect("channel should be closed").is_none());
}
}
3 changes: 0 additions & 3 deletions finality-aleph/src/validator_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ use codec::Codec;
use sp_core::crypto::KeyTypeId;
use tokio::io::{AsyncRead, AsyncWrite};

mod handshake;
mod heartbeat;
mod incoming;
mod io;
mod manager;
#[cfg(test)]
pub mod mock;
mod outgoing;
mod protocol_negotiation;
mod protocols;
mod service;

Expand Down
25 changes: 19 additions & 6 deletions finality-aleph/src/validator_network/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use tokio::time::{sleep, Duration};
use crate::{
crypto::AuthorityPen,
validator_network::{
protocol_negotiation::{protocol, ProtocolNegotiationError},
protocols::ProtocolError,
protocols::{
protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService,
},
ConnectionInfo, Data, Dialer, PeerAddressInfo,
},
};
Expand Down Expand Up @@ -44,7 +45,8 @@ async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
peer_id: AuthorityId,
mut dialer: ND,
addresses: Vec<A>,
result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option<mpsc::UnboundedSender<D>>)>,
result_for_parent: mpsc::UnboundedSender<ResultForService<D>>,
data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), OutgoingError<A, ND>> {
debug!(target: "validator-network", "Trying to connect to {}.", peer_id);
let stream = dialer
Expand All @@ -58,7 +60,13 @@ async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
.map_err(|e| OutgoingError::ProtocolNegotiation(peer_address_info.clone(), e))?;
debug!(target: "validator-network", "Negotiated protocol, running.");
protocol
.manage_outgoing(stream, authority_pen, peer_id, result_for_parent)
.manage_outgoing(
stream,
authority_pen,
peer_id,
result_for_parent,
data_for_user,
)
.await
.map_err(|e| OutgoingError::Protocol(peer_address_info.clone(), e))
}
Expand All @@ -73,20 +81,25 @@ pub async fn outgoing<D: Data, A: Data + Debug, ND: Dialer<A>>(
peer_id: AuthorityId,
dialer: ND,
addresses: Vec<A>,
result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option<mpsc::UnboundedSender<D>>)>,
result_for_parent: mpsc::UnboundedSender<ResultForService<D>>,
data_for_user: mpsc::UnboundedSender<D>,
) {
if let Err(e) = manage_outgoing(
authority_pen,
peer_id.clone(),
dialer,
addresses.clone(),
result_for_parent.clone(),
data_for_user,
)
.await
{
info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", peer_id, addresses, e, RETRY_DELAY.as_secs());
sleep(RETRY_DELAY).await;
if result_for_parent.unbounded_send((peer_id, None)).is_err() {
if result_for_parent
.unbounded_send((peer_id, None, ConnectionType::LegacyOutgoing))
.is_err()
{
debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service.");
}
}
Expand Down
143 changes: 143 additions & 0 deletions finality-aleph/src/validator_network/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use aleph_primitives::AuthorityId;
use futures::channel::mpsc;

use crate::{
crypto::AuthorityPen,
validator_network::{
io::{ReceiveError, SendError},
Data, Splittable,
},
};

mod handshake;
mod negotiation;
mod v0;

use handshake::HandshakeError;
pub use negotiation::{protocol, ProtocolNegotiationError};

pub type Version = u32;

/// The types of connections needed for backwards compatibility with the legacy two connections
/// protocol. Remove after it's no longer needed.
#[derive(PartialEq, Debug, Eq, Clone, Copy)]
pub enum ConnectionType {
LegacyIncoming,
LegacyOutgoing,
}

/// What connections send back to the service after they become established. Starts with a peer id
/// of the remote node, followed by a channel for sending data to that node, with None if the
/// connection was unsuccessful and should be reestablished. Finally a marker for legacy
/// compatibility.
pub type ResultForService<D> = (
AuthorityId,
Option<mpsc::UnboundedSender<D>>,
ConnectionType,
);

/// Defines the protocol for communication.
#[derive(Debug, PartialEq, Eq)]
pub enum Protocol {
/// The first version of the protocol, with unidirectional connections.
V0,
}

/// Protocol error.
#[derive(Debug)]
pub enum ProtocolError {
/// Error during performing a handshake.
HandshakeError(HandshakeError),
/// Sending failed.
SendError(SendError),
/// Receiving failed.
ReceiveError(ReceiveError),
/// Heartbeat stopped.
CardiacArrest,
/// Channel to the parent service closed.
NoParentConnection,
/// Data channel closed.
NoUserConnection,
}

impl Display for ProtocolError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use ProtocolError::*;
match self {
HandshakeError(e) => write!(f, "handshake error: {}", e),
SendError(e) => write!(f, "send error: {}", e),
ReceiveError(e) => write!(f, "receive error: {}", e),
CardiacArrest => write!(f, "heartbeat stopped"),
NoParentConnection => write!(f, "cannot send result to service"),
NoUserConnection => write!(f, "cannot send data to user"),
}
}
}

impl From<HandshakeError> for ProtocolError {
fn from(e: HandshakeError) -> Self {
ProtocolError::HandshakeError(e)
}
}

impl From<SendError> for ProtocolError {
fn from(e: SendError) -> Self {
ProtocolError::SendError(e)
}
}

impl From<ReceiveError> for ProtocolError {
fn from(e: ReceiveError) -> Self {
ProtocolError::ReceiveError(e)
}
}

impl Protocol {
/// Minimal supported protocol version.
const MIN_VERSION: Version = 0;

/// Maximal supported protocol version.
const MAX_VERSION: Version = 0;

/// Launches the proper variant of the protocol (receiver half).
pub async fn manage_incoming<D: Data, S: Splittable>(
&self,
stream: S,
authority_pen: AuthorityPen,
result_for_service: mpsc::UnboundedSender<ResultForService<D>>,
data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), ProtocolError> {
use Protocol::*;
match self {
V0 => v0::incoming(stream, authority_pen, result_for_service, data_for_user).await,
}
}

/// Launches the proper variant of the protocol (sender half).
pub async fn manage_outgoing<D: Data, S: Splittable>(
&self,
stream: S,
authority_pen: AuthorityPen,
peer_id: AuthorityId,
result_for_service: mpsc::UnboundedSender<ResultForService<D>>,
_data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), ProtocolError> {
use Protocol::*;
match self {
V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await,
}
}
}

impl TryFrom<Version> for Protocol {
type Error = Version;

fn try_from(version: Version) -> Result<Self, Self::Error> {
match version {
0 => Ok(Protocol::V0),
unknown_version => Err(unknown_version),
}
}
}
Loading

0 comments on commit 94039f2

Please sign in to comment.