From b71e84fc11bc10bde2c96bbbdac0577e34d27f79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Fri, 27 Jan 2023 14:57:08 +0100 Subject: [PATCH 01/14] new authorizator service for the validator network - no send/receive data before a user is authorized --- .../src/network/clique/authorization.rs | 184 ++++++++++++++++++ finality-aleph/src/network/clique/incoming.rs | 21 +- .../src/network/clique/manager/mod.rs | 4 + finality-aleph/src/network/clique/mod.rs | 1 + .../src/network/clique/protocols/handshake.rs | 34 ++++ .../src/network/clique/protocols/mod.rs | 32 ++- .../src/network/clique/protocols/v0/mod.rs | 105 ++++++++-- .../src/network/clique/protocols/v1/mod.rs | 80 +++++++- finality-aleph/src/network/clique/service.rs | 22 ++- 9 files changed, 452 insertions(+), 31 deletions(-) create mode 100644 finality-aleph/src/network/clique/authorization.rs diff --git a/finality-aleph/src/network/clique/authorization.rs b/finality-aleph/src/network/clique/authorization.rs new file mode 100644 index 0000000000..c71174cab6 --- /dev/null +++ b/finality-aleph/src/network/clique/authorization.rs @@ -0,0 +1,184 @@ +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; + +#[derive(Debug, PartialEq, Eq)] +pub enum AuthorizatorError { + MissingService, + ServiceDisappeared, +} + +/// Allows one to authorize incoming public-keys. +#[async_trait::async_trait] +pub trait Authorization { + async fn is_authorized(&self, value: PK) -> Result; +} + +struct AuthorizationHandler { + identifier: PK, + result_sender: oneshot::Sender, +} + +impl AuthorizationHandler { + fn new(result: PK) -> (Self, oneshot::Receiver) { + let (auth_sender, auth_receiver) = oneshot::channel(); + ( + Self { + identifier: result, + result_sender: auth_sender, + }, + auth_receiver, + ) + } + + pub fn handle_authorization( + self, + handler: impl FnOnce(PK) -> bool, + ) -> Result<(), AuthorizatorError> { + let auth_result = handler(self.identifier); + self.result_sender + .send(auth_result) + .map_err(|_| AuthorizatorError::MissingService) + } +} + +/// Used for validation of authorization requests. One should call [handle_authorization](Self::handle_authorization) and +/// provide a callback responsible for authorization. Each such call should be matched with call to +/// [Authorizator::is_authorized](Authorizator::is_authorized). +pub struct AuthorizationRequestHandler { + receiver: mpsc::UnboundedReceiver>, +} + +impl AuthorizationRequestHandler { + fn new(receiver: mpsc::UnboundedReceiver>) -> Self { + Self { receiver } + } + + pub async fn handle_authorization bool>( + &mut self, + handler: F, + ) -> Result<(), AuthorizatorError> { + let next = self + .receiver + .next() + .await + .ok_or(AuthorizatorError::MissingService)?; + + next.handle_authorization(handler) + } +} + +#[derive(Clone)] +pub struct Authorizator { + sender: mpsc::UnboundedSender>, +} + +/// `Authorizator` is responsible for authorization of public-keys for the validator-network component. Each call to +/// [is_authorized](Authorizator::is_authorized) should be followed by a call of +/// [handle_authorization](AuthorizationHandler::handle_authorization). +impl Authorizator { + pub fn new() -> (Self, AuthorizationRequestHandler) { + let (sender, receiver) = mpsc::unbounded(); + (Self { sender }, AuthorizationRequestHandler::new(receiver)) + } +} + +#[async_trait::async_trait] +impl Authorization for Authorizator { + async fn is_authorized(&self, value: PK) -> Result { + let (handler, receiver) = AuthorizationHandler::new(value); + self.sender + .unbounded_send(handler) + .map_err(|_| AuthorizatorError::MissingService)?; + receiver + .await + .map_err(|_| AuthorizatorError::ServiceDisappeared) + } +} + +#[cfg(test)] +mod tests { + use futures::join; + + use crate::network::clique::{ + authorization::{Authorization, Authorizator, AuthorizatorError}, + mock::{key, MockSecretKey}, + SecretKey, + }; + + #[tokio::test] + async fn authorization_sanity_check() { + let (authorizator, mut request_handler) = + Authorizator::<::PublicKey>::new(); + let public_key = key().0; + let (authorizator_result, request_handler_result) = join!( + authorizator.is_authorized(public_key.clone()), + request_handler.handle_authorization(|_| true), + ); + + assert_eq!( + authorizator_result.expect("Authorizator should return Ok."), + true + ); + assert_eq!( + request_handler_result.expect("Request handler should return Ok."), + () + ); + + let (authorizator_result, request_handler_result) = join!( + authorizator.is_authorized(public_key), + request_handler.handle_authorization(|_| false), + ); + + assert_eq!( + authorizator_result.expect("Authorizator should return Ok."), + false + ); + assert_eq!( + request_handler_result.expect("Request handler should return Ok."), + () + ); + } + + #[tokio::test] + async fn authorizator_returns_error_when_handler_is_dropped() { + let (authorizator, request_handler) = + Authorizator::<::PublicKey>::new(); + let public_key = key().0; + drop(request_handler); + let result = authorizator.is_authorized(public_key.clone()).await; + + assert_eq!(result, Err(AuthorizatorError::MissingService)) + } + + #[tokio::test] + async fn authorizator_returns_error_when_handler_disappeared() { + let (authorizator, mut request_handler) = + Authorizator::<::PublicKey>::new(); + let public_key = key().0; + let (authorizator_result, _) = join!( + authorizator.is_authorized(public_key.clone()), + tokio::spawn(async move { + request_handler + .handle_authorization(|_| panic!("handler bye bye")) + .await + }), + ); + + assert_eq!( + authorizator_result, + Err(AuthorizatorError::ServiceDisappeared) + ) + } + + #[tokio::test] + async fn authorization_request_handler_returns_error_when_all_authorizators_are_missing() { + let (authorizator, mut request_handler) = + Authorizator::<::PublicKey>::new(); + drop(authorizator); + let result = request_handler.handle_authorization(|_| true).await; + + assert_eq!(result, Err(AuthorizatorError::MissingService)) + } +} diff --git a/finality-aleph/src/network/clique/incoming.rs b/finality-aleph/src/network/clique/incoming.rs index cab609af98..578856f647 100644 --- a/finality-aleph/src/network/clique/incoming.rs +++ b/finality-aleph/src/network/clique/incoming.rs @@ -4,6 +4,7 @@ use futures::channel::mpsc; use log::{debug, info}; use crate::network::clique::{ + authorization::Authorizator, protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, }; @@ -40,6 +41,7 @@ async fn manage_incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, + authorizator: Authorizator, ) -> Result<(), IncomingError> { debug!( target: LOG_TARGET, @@ -48,7 +50,13 @@ async fn manage_incoming( let (stream, protocol) = protocol(stream).await?; debug!(target: LOG_TARGET, "Negotiated protocol, running."); Ok(protocol - .manage_incoming(stream, secret_key, result_for_parent, data_for_user) + .manage_incoming( + stream, + secret_key, + result_for_parent, + data_for_user, + authorizator, + ) .await?) } @@ -62,9 +70,18 @@ pub async fn incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, + authorizator: Authorizator, ) { let addr = stream.peer_address_info(); - if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await { + if let Err(e) = manage_incoming( + secret_key, + stream, + result_for_parent, + data_for_user, + authorizator, + ) + .await + { info!( target: LOG_TARGET, "Incoming connection from {} failed: {}.", addr, e diff --git a/finality-aleph/src/network/clique/manager/mod.rs b/finality-aleph/src/network/clique/manager/mod.rs index cb15c3db81..17f5172a8c 100644 --- a/finality-aleph/src/network/clique/manager/mod.rs +++ b/finality-aleph/src/network/clique/manager/mod.rs @@ -232,6 +232,10 @@ impl Manager { pub fn status_report(&self) -> impl Display { ManagerStatus::new(self) } + + pub fn is_authorized(&self, public_key: &PK) -> bool { + self.wanted.interested(public_key) + } } #[cfg(test)] diff --git a/finality-aleph/src/network/clique/mod.rs b/finality-aleph/src/network/clique/mod.rs index 02e30db490..d40b64cc77 100644 --- a/finality-aleph/src/network/clique/mod.rs +++ b/finality-aleph/src/network/clique/mod.rs @@ -5,6 +5,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::network::Data; +mod authorization; mod crypto; mod incoming; mod io; diff --git a/finality-aleph/src/network/clique/protocols/handshake.rs b/finality-aleph/src/network/clique/protocols/handshake.rs index e7720a8880..2125718189 100644 --- a/finality-aleph/src/network/clique/protocols/handshake.rs +++ b/finality-aleph/src/network/clique/protocols/handshake.rs @@ -180,6 +180,40 @@ pub async fn v0_handshake_outgoing( .map_err(|_| HandshakeError::TimedOut)? } +#[async_trait::async_trait] +pub trait Handshake { + async fn handshake_incoming( + stream: S, + secret_key: SK, + ) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError>; + + async fn handshake_outgoing( + stream: S, + secret_key: SK, + public_key: SK::PublicKey, + ) -> Result<(S::Sender, S::Receiver), HandshakeError>; +} + +pub struct DefaultHandshake; + +#[async_trait::async_trait] +impl Handshake for DefaultHandshake { + async fn handshake_incoming( + stream: S, + secret_key: SK, + ) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> { + v0_handshake_incoming(stream, secret_key).await + } + + async fn handshake_outgoing( + stream: S, + secret_key: SK, + public_key: SK::PublicKey, + ) -> Result<(S::Sender, S::Receiver), HandshakeError> { + v0_handshake_outgoing(stream, secret_key, public_key).await + } +} + #[cfg(test)] mod tests { use futures::{join, try_join}; diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index eb056caca8..1c660c618b 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -3,6 +3,7 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use futures::channel::mpsc; use crate::network::clique::{ + authorization::Authorizator, io::{ReceiveError, SendError}, Data, PublicKey, SecretKey, Splittable, }; @@ -12,8 +13,9 @@ mod negotiation; mod v0; mod v1; -use handshake::HandshakeError; +pub use handshake::{Handshake, HandshakeError}; pub use negotiation::{protocol, ProtocolNegotiationError}; +pub use v0::handle_authorization; pub type Version = u32; @@ -57,6 +59,8 @@ pub enum ProtocolError { NoParentConnection, /// Data channel closed. NoUserConnection, + /// Authorization error. + NotAuthorized, } impl Display for ProtocolError { @@ -69,6 +73,7 @@ impl Display for ProtocolError { CardiacArrest => write!(f, "heartbeat stopped"), NoParentConnection => write!(f, "cannot send result to service"), NoUserConnection => write!(f, "cannot send data to user"), + NotAuthorized => write!(f, "user not authorized"), } } } @@ -103,13 +108,32 @@ impl Protocol { &self, stream: S, secret_key: SK, - result_for_service: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, + authorizator: Authorizator, ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::incoming(stream, secret_key, result_for_service, data_for_user).await, - V1 => v1::incoming(stream, secret_key, result_for_service, data_for_user).await, + V0 => { + v0::incoming( + stream, + secret_key, + authorizator, + result_for_parent, + data_for_user, + ) + .await + } + V1 => { + v1::incoming( + stream, + secret_key, + authorizator, + result_for_parent, + data_for_user, + ) + .await + } } } diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 5d93a14f14..42d5395a7e 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -1,13 +1,11 @@ use futures::{channel::mpsc, StreamExt}; -use log::{debug, info, trace}; +use log::{debug, info, trace, warn}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::network::clique::{ + authorization::{Authorization, AuthorizatorError}, io::{receive_data, send_data}, - protocols::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing}, - ConnectionType, ProtocolError, ResultForService, - }, + protocols::{ConnectionType, ProtocolError, ResultForService}, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, }; @@ -15,6 +13,8 @@ mod heartbeat; use heartbeat::{heartbeat_receiver, heartbeat_sender}; +use super::handshake::{DefaultHandshake, Handshake}; + /// Receives data from the parent service and sends it over the network. /// Exits when the parent channel is closed, or if the network connection is broken. async fn sending( @@ -37,9 +37,19 @@ pub async fn outgoing( secret_key: SK, public_key: SK::PublicKey, result_for_parent: mpsc::UnboundedSender>, +) -> Result<(), ProtocolError> { + handle_outgoing::<_, _, _, DefaultHandshake>(stream, secret_key, public_key, result_for_parent) + .await +} + +pub async fn handle_outgoing>( + stream: S, + secret_key: SK, + public_key: SK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Extending hand to {}.", public_key); - let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; + let (sender, receiver) = H::handshake_outgoing(stream, secret_key, public_key.clone()).await?; info!( target: LOG_TARGET, "Outgoing handshake with {} finished successfully.", public_key @@ -85,19 +95,54 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn incoming( +pub async fn incoming>( + stream: S, + secret_key: SK, + authorizator: A, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + handle_incoming::<_, _, _, _, DefaultHandshake>( + stream, + secret_key, + authorizator, + result_for_parent, + data_for_user, + ) + .await +} + +pub async fn handle_incoming< + SK: SecretKey, + D: Data, + S: Splittable, + A: Authorization, + H: Handshake, +>( stream: S, secret_key: SK, + authorizator: A, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Waiting for extended hand..."); - let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; + let (sender, receiver, public_key) = H::handshake_incoming(stream, secret_key).await?; info!( target: LOG_TARGET, "Incoming handshake with {} finished successfully.", public_key ); + let authorized = handle_authorization::(authorizator, public_key.clone()) + .await + .map_err(|_| ProtocolError::NotAuthorized)?; + if !authorized { + warn!( + target: LOG_TARGET, + "public_key={} was not authorized.", public_key + ); + return Ok(()); + } + let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent .unbounded_send(( @@ -123,15 +168,48 @@ pub async fn incoming( } } +pub async fn handle_authorization>( + authorizator: A, + public_key: SK::PublicKey, +) -> Result { + let authorization_result = authorizator.is_authorized(public_key.clone()).await; + match authorization_result { + Ok(result) => Ok(result), + Err(error) => { + match error { + AuthorizatorError::MissingService => warn!( + target: LOG_TARGET, + "Authorization service for public_key={} went missing before we called it.", + public_key + ), + AuthorizatorError::ServiceDisappeared => warn!( + target: LOG_TARGET, + "We managed to send authorization request for public_key={}, but were unable to receive an answer.", + public_key + ), + }; + Err(()) + } + } +} + #[cfg(test)] -mod tests { +pub mod tests { use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; + use parking_lot::Mutex; - use super::{incoming, outgoing, ProtocolError}; + use super::handle_incoming; use crate::network::clique::{ - mock::{key, MockPrelims, MockSplittable}, - protocols::ConnectionType, - Data, + authorization::Authorization, + mock::{ + key, new_authorizer, IteratorWrapper, MockAuthorizer, MockPrelims, MockSplittable, + MockWrappedSplittable, NoHandshake, WrappingReader, WrappingWriter, + }, + protocols::{ + v0::{incoming, outgoing}, + ConnectionType, Handshake, ProtocolError, ResultForService, + }, + Data, SecretKey, Splittable, }; fn prepare() -> MockPrelims { @@ -145,6 +223,7 @@ mod tests { let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), + new_authorizer(), incoming_result_for_service, data_for_user, )); diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 701baeb244..447339556c 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -1,15 +1,17 @@ use codec::{Decode, Encode}; use futures::{channel::mpsc, StreamExt}; -use log::{debug, info, trace}; +use log::{debug, info, trace, warn}; use tokio::{ io::{AsyncRead, AsyncWrite}, time::{timeout, Duration}, }; use crate::network::clique::{ + authorization::Authorization, io::{receive_data, send_data}, protocols::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + handle_authorization, + handshake::{DefaultHandshake, Handshake}, ConnectionType, ProtocolError, ResultForService, }, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, @@ -91,9 +93,26 @@ pub async fn outgoing( public_key: SK::PublicKey, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + handle_outgoing::<_, _, _, DefaultHandshake>( + stream, + secret_key, + public_key, + result_for_parent, + data_for_user, + ) + .await +} + +pub async fn handle_outgoing>( + stream: S, + secret_key: SK, + public_key: SK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Extending hand to {}.", public_key); - let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; + let (sender, receiver) = H::handshake_outgoing(stream, secret_key, public_key.clone()).await?; info!( target: LOG_TARGET, "Outgoing handshake with {} finished successfully.", public_key @@ -106,6 +125,7 @@ pub async fn outgoing( ConnectionType::New, )) .map_err(|_| ProtocolError::NoParentConnection)?; + debug!( target: LOG_TARGET, "Starting worker for communicating with {}.", public_key @@ -116,18 +136,54 @@ pub async fn outgoing( /// Performs the incoming handshake, and then manages a connection sending and receiving data. /// Exits on parent request (when the data source is dropped), or in case of broken or dead /// network connection. -pub async fn incoming( +pub async fn incoming>( + stream: S, + secret_key: SK, + authorizator: A, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + handle_incoming::<_, _, _, _, DefaultHandshake>( + stream, + secret_key, + authorizator, + result_for_parent, + data_for_user, + ) + .await +} + +pub async fn handle_incoming< + SK: SecretKey, + D: Data, + S: Splittable, + A: Authorization, + H: Handshake, +>( stream: S, secret_key: SK, + authorizator: A, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Waiting for extended hand..."); - let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; + let (sender, receiver, public_key) = H::handshake_incoming(stream, secret_key).await?; info!( target: LOG_TARGET, "Incoming handshake with {} finished successfully.", public_key ); + + let authorized = handle_authorization::(authorizator, public_key.clone()) + .await + .map_err(|_| ProtocolError::NotAuthorized)?; + if !authorized { + warn!( + target: LOG_TARGET, + "public_key={} was not authorized.", public_key + ); + return Ok(()); + } + let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send(( @@ -147,11 +203,15 @@ pub async fn incoming( mod tests { use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; - use super::{incoming, outgoing, ProtocolError}; use crate::network::clique::{ - mock::{key, MockPrelims, MockSplittable}, - protocols::ConnectionType, - Data, + authorization::Authorization, + mock::{key, new_authorizer, MockPrelims, MockSplittable}, + protocols::{ + v0::tests::{execute_do_not_call_sender_and_receiver_until_authorized, HandleIncoming}, + v1::{handle_incoming, incoming, outgoing}, + ConnectionType, Handshake, ProtocolError, ResultForService, + }, + Data, SecretKey, Splittable, }; fn prepare() -> MockPrelims { @@ -163,9 +223,11 @@ mod tests { let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (incoming_data_for_user, data_from_incoming) = mpsc::unbounded::(); let (outgoing_data_for_user, data_from_outgoing) = mpsc::unbounded::(); + let authorizer = new_authorizer(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), + authorizer, incoming_result_for_service, incoming_data_for_user, )); diff --git a/finality-aleph/src/network/clique/service.rs b/finality-aleph/src/network/clique/service.rs index 470ca0898c..d6c999c5a8 100644 --- a/finality-aleph/src/network/clique/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -10,6 +10,7 @@ use tokio::time; use crate::{ network::{ clique::{ + authorization::Authorizator, incoming::incoming, manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, @@ -126,7 +127,7 @@ where } fn spawn_new_outgoing( - &self, + &mut self, public_key: SK::PublicKey, address: A, result_for_parent: mpsc::UnboundedSender>, @@ -152,12 +153,20 @@ where &self, stream: NL::Connection, result_for_parent: mpsc::UnboundedSender>, + authorizator: Authorizator, ) { let secret_key = self.secret_key.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/clique_network_incoming", None, async move { - incoming(secret_key, stream, result_for_parent, next_to_interface).await; + incoming( + secret_key, + stream, + result_for_parent, + next_to_interface, + authorizator, + ) + .await; }); } @@ -233,12 +242,14 @@ where pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); let (result_for_parent, mut worker_results) = mpsc::unbounded(); + let (authorizator, mut authorization_handler) = Authorizator::new(); use ServiceCommand::*; loop { + let manager = &self.manager; tokio::select! { // got new incoming connection from the listener - spawn an incoming worker maybe_stream = self.listener.accept() => match maybe_stream { - Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone()), + Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone(), authorizator.clone()), Err(e) => warn!(target: LOG_TARGET, "Listener failed to accept connection: {}", e), }, // got a new command from the interface @@ -276,6 +287,11 @@ where } }, }, + result = authorization_handler.handle_authorization(|pk| manager.is_authorized(&pk)) => { + if result.is_err() { + warn!(target: LOG_TARGET, "Other side of the Authorization Service is already closed."); + } + }, // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection Some((public_key, maybe_data_for_network, connection_type)) = worker_results.next() => { From 58f38ebd95da7a68c01cb2f80058aed38c5dcfa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Fri, 27 Jan 2023 14:58:11 +0100 Subject: [PATCH 02/14] unit-tests for the authorization api for the validator-network --- finality-aleph/src/network/clique/mock.rs | 222 +++++++++++++++++- .../src/network/clique/protocols/v0/mod.rs | 86 +++++++ .../src/network/clique/protocols/v1/mod.rs | 34 +++ 3 files changed, 341 insertions(+), 1 deletion(-) diff --git a/finality-aleph/src/network/clique/mock.rs b/finality-aleph/src/network/clique/mock.rs index 13ade25354..87c27dbc9e 100644 --- a/finality-aleph/src/network/clique/mock.rs +++ b/finality-aleph/src/network/clique/mock.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, fmt::{Display, Error as FmtError, Formatter}, io::Result as IoResult, + marker::PhantomData, pin::Pin, task::{Context, Poll}, }; @@ -17,7 +18,8 @@ use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf}; use crate::network::{ clique::{ - protocols::{ProtocolError, ResultForService}, + authorization::{Authorization, AuthorizatorError}, + protocols::{Handshake, HandshakeError, ProtocolError, ResultForService}, ConnectionInfo, Dialer, Listener, Network, PeerAddressInfo, PublicKey, SecretKey, Splittable, LOG_TARGET, }, @@ -165,6 +167,67 @@ impl Splittable for MockSplittable { } } +pub struct MockWrappedSplittable { + reader: R, + writer: W, +} + +impl MockWrappedSplittable { + pub fn new(reader: R, writer: W) -> Self { + Self { reader, writer } + } +} + +impl AsyncWrite for MockWrappedSplittable { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) + } +} + +impl AsyncRead for MockWrappedSplittable { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().reader).poll_read(cx, buf) + } +} + +impl ConnectionInfo for MockWrappedSplittable { + fn peer_address_info(&self) -> PeerAddressInfo { + String::from("MOCK_WRAPPED_ADDRESS") + } +} + +impl< + R: AsyncRead + Unpin + Send + ConnectionInfo, + W: AsyncWrite + Unpin + Send + ConnectionInfo, + > Splittable for MockWrappedSplittable +{ + type Sender = W; + type Receiver = R; + + fn split(self) -> (Self::Sender, Self::Receiver) { + (self.writer, self.reader) + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] pub struct MockAddressingInformation { peer_id: MockPublicKey, @@ -555,3 +618,160 @@ pub struct MockPrelims { pub result_from_incoming: UnboundedReceiver>, pub result_from_outgoing: UnboundedReceiver>, } + +pub struct MockAuthorizer { + check: C, + _phantom_data: PhantomData, +} + +impl MockAuthorizer { + pub fn new_with_closure(check: C) -> MockAuthorizer { + MockAuthorizer { + check, + _phantom_data: PhantomData, + } + } +} + +pub fn new_authorizer() -> MockAuthorizer bool> { + MockAuthorizer::new_with_closure(|_: PK| true) +} + +#[async_trait::async_trait] +impl bool + Send + Sync> Authorization for MockAuthorizer { + async fn is_authorized(&self, public_key: PK) -> Result { + Ok((self.check)(public_key)) + } +} + +pub struct NoHandshake; + +#[async_trait::async_trait] +impl Handshake for NoHandshake { + async fn handshake_incoming( + stream: S, + _: MockSecretKey, + ) -> Result< + ( + S::Sender, + S::Receiver, + ::PublicKey, + ), + HandshakeError<::PublicKey>, + > { + let (sender, receiver) = stream.split(); + Ok((sender, receiver, key().0)) + } + + async fn handshake_outgoing( + stream: S, + _secret_key: MockSecretKey, + _public_key: ::PublicKey, + ) -> Result<(S::Sender, S::Receiver), HandshakeError<::PublicKey>> + { + let (sender, receiver) = stream.split(); + Ok((sender, receiver)) + } +} + +pub struct WrappingReader { + action: A, + reader: R, +} + +impl WrappingReader { + pub fn new_with_closure(reader: R, closure: A) -> Self { + Self { + action: closure, + reader, + } + } +} + +impl AsyncRead for WrappingReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let self_mut = self.get_mut(); + (self_mut.action)(); + Pin::new(&mut self_mut.reader).poll_read(cx, buf) + } +} + +impl ConnectionInfo for WrappingReader { + fn peer_address_info(&self) -> crate::network::clique::PeerAddressInfo { + String::from("WRAPPING_READER") + } +} + +pub struct IteratorWrapper(I); + +impl IteratorWrapper { + pub fn new(iterator: I) -> Self { + Self(iterator) + } +} + +impl + Unpin> AsyncRead for IteratorWrapper { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let iter = &mut self.get_mut().0; + let buffer = buf.initialize_unfilled(); + let remaining = buffer.len(); + for cell in buffer.iter_mut() { + match iter.next() { + Some(next) => *cell = next, + None => { + return Poll::Pending; + } + } + } + buf.advance(remaining); + Poll::Ready(Result::Ok(())) + } +} + +pub struct WrappingWriter { + action: A, + writer: W, +} + +impl WrappingWriter { + pub fn new_with_closure(writer: W, action: A) -> Self { + Self { action, writer } + } +} + +impl AsyncWrite for WrappingWriter { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let self_mut = self.get_mut(); + (self_mut.action)(); + AsyncWrite::poll_write(Pin::new(&mut self_mut.writer), cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + AsyncWrite::poll_flush(Pin::new(&mut self.get_mut().writer), cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + AsyncWrite::poll_shutdown(Pin::new(&mut self.get_mut().writer), cx) + } +} + +impl ConnectionInfo for WrappingWriter { + fn peer_address_info(&self) -> crate::network::clique::PeerAddressInfo { + String::from("WRAPPING_WRITER") + } +} diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 42d5395a7e..20c0c95e22 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -500,4 +500,90 @@ pub mod tests { Ok(_) => panic!("successfully finished when connection dead"), }; } + + #[async_trait::async_trait] + pub trait HandleIncoming { + async fn handle_incoming( + stream: S, + secret_key: SK, + authorization: A, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> + where + SK: SecretKey, + D: Data, + S: Splittable, + A: Authorization + Send + Sync, + H: Handshake; + } + + pub async fn execute_do_not_call_sender_and_receiver_until_authorized() { + let writer = WrappingWriter::new_with_closure(Vec::new(), move || { + panic!("Writer should not be called."); + }); + let reader = WrappingReader::new_with_closure( + IteratorWrapper::new([0].into_iter().cycle()), + move || { + panic!("Reader should not be called."); + }, + ); + let stream = MockWrappedSplittable::new(reader, writer); + let (result_for_parent, _) = mpsc::unbounded(); + let (data_for_user, _) = mpsc::unbounded::>(); + + let authorizer_called = Mutex::new(false); + let authorizer = MockAuthorizer::new_with_closure(|_| { + *authorizer_called.lock() = true; + false + }); + let (_, secret_key) = key(); + + // It should exit immediately after we reject authorization. + // `NoHandshake` mocks the real handshake procedure and it does not call reader nor writer. + let result = H::handle_incoming::<_, _, _, _, NoHandshake>( + stream, + secret_key, + authorizer, + result_for_parent, + data_for_user, + ) + .await; + assert!(result.is_ok()); + assert!(*authorizer_called.lock()); + } + + struct V0HandleIncoming; + + #[async_trait::async_trait] + impl HandleIncoming for V0HandleIncoming { + async fn handle_incoming( + stream: S, + secret_key: SK, + authorization: A, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> + where + SK: SecretKey, + D: Data, + S: Splittable, + A: Authorization + Send + Sync, + H: Handshake, + { + handle_incoming::<_, _, _, _, H>( + stream, + secret_key, + authorization, + result_for_parent, + data_for_user, + ) + .await + } + } + + #[tokio::test] + async fn do_not_call_sender_and_receiver_until_authorized() { + execute_do_not_call_sender_and_receiver_until_authorized::().await + } } diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 447339556c..8778c2462f 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -490,4 +490,38 @@ mod tests { Ok(_) => panic!("successfully finished when connection dead"), }; } + + struct V1HandleIncoming; + + #[async_trait::async_trait] + impl HandleIncoming for V1HandleIncoming { + async fn handle_incoming( + stream: S, + secret_key: SK, + authorization: A, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> + where + SK: SecretKey, + D: Data, + S: Splittable, + A: Authorization + Send + Sync, + H: Handshake, + { + handle_incoming::<_, _, _, _, H>( + stream, + secret_key, + authorization, + result_for_parent, + data_for_user, + ) + .await + } + } + + #[tokio::test] + async fn do_not_call_sender_and_receiver_until_authorized() { + execute_do_not_call_sender_and_receiver_until_authorized::().await + } } From 7baf8705579579713e68f99397414e76202f60e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Fri, 27 Jan 2023 14:58:50 +0100 Subject: [PATCH 03/14] fixed path in the script for synthetic-network --- scripts/synthetic-network/run_script_for_synthetic-network.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/synthetic-network/run_script_for_synthetic-network.sh b/scripts/synthetic-network/run_script_for_synthetic-network.sh index 419df5ad4b..7a8e88b487 100755 --- a/scripts/synthetic-network/run_script_for_synthetic-network.sh +++ b/scripts/synthetic-network/run_script_for_synthetic-network.sh @@ -50,7 +50,7 @@ if [[ "$UPDATE" = true ]]; then git submodule update fi -cd synthetic-network/frontend +cd scripts/synthetic-network/vendor/synthetic-network/frontend log "running .js script" node $SCRIPT_PATH ${@:1} From f3f69bbbc628d1b66e6e88aa760bdb54a33c4d9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Fri, 27 Jan 2023 15:12:47 +0100 Subject: [PATCH 04/14] simplified authorization impl for validator network --- .../src/network/clique/authorization.rs | 46 +++++-------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/finality-aleph/src/network/clique/authorization.rs b/finality-aleph/src/network/clique/authorization.rs index c71174cab6..de1b6f8ce1 100644 --- a/finality-aleph/src/network/clique/authorization.rs +++ b/finality-aleph/src/network/clique/authorization.rs @@ -15,43 +15,15 @@ pub trait Authorization { async fn is_authorized(&self, value: PK) -> Result; } -struct AuthorizationHandler { - identifier: PK, - result_sender: oneshot::Sender, -} - -impl AuthorizationHandler { - fn new(result: PK) -> (Self, oneshot::Receiver) { - let (auth_sender, auth_receiver) = oneshot::channel(); - ( - Self { - identifier: result, - result_sender: auth_sender, - }, - auth_receiver, - ) - } - - pub fn handle_authorization( - self, - handler: impl FnOnce(PK) -> bool, - ) -> Result<(), AuthorizatorError> { - let auth_result = handler(self.identifier); - self.result_sender - .send(auth_result) - .map_err(|_| AuthorizatorError::MissingService) - } -} - /// Used for validation of authorization requests. One should call [handle_authorization](Self::handle_authorization) and /// provide a callback responsible for authorization. Each such call should be matched with call to /// [Authorizator::is_authorized](Authorizator::is_authorized). pub struct AuthorizationRequestHandler { - receiver: mpsc::UnboundedReceiver>, + receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, } impl AuthorizationRequestHandler { - fn new(receiver: mpsc::UnboundedReceiver>) -> Self { + fn new(receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>) -> Self { Self { receiver } } @@ -59,19 +31,23 @@ impl AuthorizationRequestHandler { &mut self, handler: F, ) -> Result<(), AuthorizatorError> { - let next = self + let (identifier, result_sender) = self .receiver .next() .await .ok_or(AuthorizatorError::MissingService)?; - next.handle_authorization(handler) + let auth_result = handler(identifier); + result_sender + .send(auth_result) + .map_err(|_| AuthorizatorError::MissingService)?; + Ok(()) } } #[derive(Clone)] pub struct Authorizator { - sender: mpsc::UnboundedSender>, + sender: mpsc::UnboundedSender<(PK, oneshot::Sender)>, } /// `Authorizator` is responsible for authorization of public-keys for the validator-network component. Each call to @@ -87,9 +63,9 @@ impl Authorizator { #[async_trait::async_trait] impl Authorization for Authorizator { async fn is_authorized(&self, value: PK) -> Result { - let (handler, receiver) = AuthorizationHandler::new(value); + let (sender, receiver) = oneshot::channel(); self.sender - .unbounded_send(handler) + .unbounded_send((value, sender)) .map_err(|_| AuthorizatorError::MissingService)?; receiver .await From e2bc52e4b400249e70509ef48a500abe8ac03f1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Fri, 27 Jan 2023 20:24:01 +0100 Subject: [PATCH 05/14] authorization: lint --- .../src/network/clique/authorization.rs | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/finality-aleph/src/network/clique/authorization.rs b/finality-aleph/src/network/clique/authorization.rs index de1b6f8ce1..c47c4dac91 100644 --- a/finality-aleph/src/network/clique/authorization.rs +++ b/finality-aleph/src/network/clique/authorization.rs @@ -93,13 +93,10 @@ mod tests { request_handler.handle_authorization(|_| true), ); - assert_eq!( - authorizator_result.expect("Authorizator should return Ok."), - true - ); - assert_eq!( - request_handler_result.expect("Request handler should return Ok."), - () + assert!(authorizator_result.expect("Authorizator should return Ok.")); + assert!( + request_handler_result.is_ok(), + "Request handler should return Ok." ); let (authorizator_result, request_handler_result) = join!( @@ -107,13 +104,10 @@ mod tests { request_handler.handle_authorization(|_| false), ); - assert_eq!( - authorizator_result.expect("Authorizator should return Ok."), - false - ); - assert_eq!( - request_handler_result.expect("Request handler should return Ok."), - () + assert!(!authorizator_result.expect("Authorizator should return Ok.")); + assert!( + request_handler_result.is_ok(), + "Request handler should return Ok." ); } From 63053ddc028581c14137c3ff8bf3bbf71bfee0ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Tue, 31 Jan 2023 15:26:43 +0100 Subject: [PATCH 06/14] more verbose types for the authorization api --- .../src/network/clique/authorization.rs | 23 +++++++++++++------ finality-aleph/src/network/clique/service.rs | 14 +++++++---- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/finality-aleph/src/network/clique/authorization.rs b/finality-aleph/src/network/clique/authorization.rs index c47c4dac91..64292ba35d 100644 --- a/finality-aleph/src/network/clique/authorization.rs +++ b/finality-aleph/src/network/clique/authorization.rs @@ -9,6 +9,11 @@ pub enum AuthorizatorError { ServiceDisappeared, } +pub enum AuthorizationResult { + Authorized, + NotAuthorized, +} + /// Allows one to authorize incoming public-keys. #[async_trait::async_trait] pub trait Authorization { @@ -19,15 +24,15 @@ pub trait Authorization { /// provide a callback responsible for authorization. Each such call should be matched with call to /// [Authorizator::is_authorized](Authorizator::is_authorized). pub struct AuthorizationRequestHandler { - receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, } impl AuthorizationRequestHandler { - fn new(receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>) -> Self { + fn new(receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>) -> Self { Self { receiver } } - pub async fn handle_authorization bool>( + pub async fn handle_authorization AuthorizationResult>( &mut self, handler: F, ) -> Result<(), AuthorizatorError> { @@ -47,7 +52,7 @@ impl AuthorizationRequestHandler { #[derive(Clone)] pub struct Authorizator { - sender: mpsc::UnboundedSender<(PK, oneshot::Sender)>, + sender: mpsc::UnboundedSender<(PK, oneshot::Sender)>, } /// `Authorizator` is responsible for authorization of public-keys for the validator-network component. Each call to @@ -70,6 +75,10 @@ impl Authorization for Authorizator { receiver .await .map_err(|_| AuthorizatorError::ServiceDisappeared) + .map(|auth_result| match auth_result { + AuthorizationResult::Authorized => true, + AuthorizationResult::NotAuthorized => false, + }) } } @@ -78,7 +87,7 @@ mod tests { use futures::join; use crate::network::clique::{ - authorization::{Authorization, Authorizator, AuthorizatorError}, + authorization::{Authorization, AuthorizationResult, Authorizator, AuthorizatorError}, mock::{key, MockSecretKey}, SecretKey, }; @@ -90,7 +99,7 @@ mod tests { let public_key = key().0; let (authorizator_result, request_handler_result) = join!( authorizator.is_authorized(public_key.clone()), - request_handler.handle_authorization(|_| true), + request_handler.handle_authorization(|_| AuthorizationResult::Authorized), ); assert!(authorizator_result.expect("Authorizator should return Ok.")); @@ -101,7 +110,7 @@ mod tests { let (authorizator_result, request_handler_result) = join!( authorizator.is_authorized(public_key), - request_handler.handle_authorization(|_| false), + request_handler.handle_authorization(|_| AuthorizationResult::NotAuthorized), ); assert!(!authorizator_result.expect("Authorizator should return Ok.")); diff --git a/finality-aleph/src/network/clique/service.rs b/finality-aleph/src/network/clique/service.rs index d6c999c5a8..9ca09c0f99 100644 --- a/finality-aleph/src/network/clique/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -10,7 +10,7 @@ use tokio::time; use crate::{ network::{ clique::{ - authorization::Authorizator, + authorization::{AuthorizationResult, Authorizator}, incoming::incoming, manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, @@ -245,10 +245,10 @@ where let (authorizator, mut authorization_handler) = Authorizator::new(); use ServiceCommand::*; loop { - let manager = &self.manager; + let listener = &mut self.listener; tokio::select! { // got new incoming connection from the listener - spawn an incoming worker - maybe_stream = self.listener.accept() => match maybe_stream { + maybe_stream = listener.accept() => match maybe_stream { Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone(), authorizator.clone()), Err(e) => warn!(target: LOG_TARGET, "Listener failed to accept connection: {}", e), }, @@ -287,7 +287,13 @@ where } }, }, - result = authorization_handler.handle_authorization(|pk| manager.is_authorized(&pk)) => { + result = authorization_handler.handle_authorization(|public_key| { + if self.manager.is_authorized(&public_key) { + AuthorizationResult::Authorized + } else { + AuthorizationResult::NotAuthorized + } + }) => { if result.is_err() { warn!(target: LOG_TARGET, "Other side of the Authorization Service is already closed."); } From 8caf3f9dc0883093203cf46a8f995a1eb0d057cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Tue, 31 Jan 2023 19:36:00 +0100 Subject: [PATCH 07/14] - removed mocks for AsyncWrite/Read used by the the clique authorization tests - tests for authorization in clique-network use now `prepare` instead of mocking the handshake --- .../src/network/clique/authorization.rs | 4 +- finality-aleph/src/network/clique/mock.rs | 224 +----------------- .../src/network/clique/protocols/handshake.rs | 34 --- .../src/network/clique/protocols/mod.rs | 2 +- .../src/network/clique/protocols/v0/mod.rs | 169 ++++--------- .../src/network/clique/protocols/v1/mod.rs | 118 +++------ 6 files changed, 85 insertions(+), 466 deletions(-) diff --git a/finality-aleph/src/network/clique/authorization.rs b/finality-aleph/src/network/clique/authorization.rs index 64292ba35d..b3787c692c 100644 --- a/finality-aleph/src/network/clique/authorization.rs +++ b/finality-aleph/src/network/clique/authorization.rs @@ -156,7 +156,9 @@ mod tests { let (authorizator, mut request_handler) = Authorizator::<::PublicKey>::new(); drop(authorizator); - let result = request_handler.handle_authorization(|_| true).await; + let result = request_handler + .handle_authorization(|_| AuthorizationResult::Authorized) + .await; assert_eq!(result, Err(AuthorizatorError::MissingService)) } diff --git a/finality-aleph/src/network/clique/mock.rs b/finality-aleph/src/network/clique/mock.rs index 87c27dbc9e..ffbad85a44 100644 --- a/finality-aleph/src/network/clique/mock.rs +++ b/finality-aleph/src/network/clique/mock.rs @@ -2,7 +2,6 @@ use std::{ collections::HashMap, fmt::{Display, Error as FmtError, Formatter}, io::Result as IoResult, - marker::PhantomData, pin::Pin, task::{Context, Poll}, }; @@ -16,10 +15,10 @@ use log::info; use rand::Rng; use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf}; +use super::authorization::AuthorizationRequestHandler; use crate::network::{ clique::{ - authorization::{Authorization, AuthorizatorError}, - protocols::{Handshake, HandshakeError, ProtocolError, ResultForService}, + protocols::{ProtocolError, ResultForService}, ConnectionInfo, Dialer, Listener, Network, PeerAddressInfo, PublicKey, SecretKey, Splittable, LOG_TARGET, }, @@ -167,67 +166,6 @@ impl Splittable for MockSplittable { } } -pub struct MockWrappedSplittable { - reader: R, - writer: W, -} - -impl MockWrappedSplittable { - pub fn new(reader: R, writer: W) -> Self { - Self { reader, writer } - } -} - -impl AsyncWrite for MockWrappedSplittable { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) - } -} - -impl AsyncRead for MockWrappedSplittable { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().reader).poll_read(cx, buf) - } -} - -impl ConnectionInfo for MockWrappedSplittable { - fn peer_address_info(&self) -> PeerAddressInfo { - String::from("MOCK_WRAPPED_ADDRESS") - } -} - -impl< - R: AsyncRead + Unpin + Send + ConnectionInfo, - W: AsyncWrite + Unpin + Send + ConnectionInfo, - > Splittable for MockWrappedSplittable -{ - type Sender = W; - type Receiver = R; - - fn split(self) -> (Self::Sender, Self::Receiver) { - (self.writer, self.reader) - } -} - #[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] pub struct MockAddressingInformation { peer_id: MockPublicKey, @@ -617,161 +555,5 @@ pub struct MockPrelims { pub data_from_outgoing: Option>, pub result_from_incoming: UnboundedReceiver>, pub result_from_outgoing: UnboundedReceiver>, -} - -pub struct MockAuthorizer { - check: C, - _phantom_data: PhantomData, -} - -impl MockAuthorizer { - pub fn new_with_closure(check: C) -> MockAuthorizer { - MockAuthorizer { - check, - _phantom_data: PhantomData, - } - } -} - -pub fn new_authorizer() -> MockAuthorizer bool> { - MockAuthorizer::new_with_closure(|_: PK| true) -} - -#[async_trait::async_trait] -impl bool + Send + Sync> Authorization for MockAuthorizer { - async fn is_authorized(&self, public_key: PK) -> Result { - Ok((self.check)(public_key)) - } -} - -pub struct NoHandshake; - -#[async_trait::async_trait] -impl Handshake for NoHandshake { - async fn handshake_incoming( - stream: S, - _: MockSecretKey, - ) -> Result< - ( - S::Sender, - S::Receiver, - ::PublicKey, - ), - HandshakeError<::PublicKey>, - > { - let (sender, receiver) = stream.split(); - Ok((sender, receiver, key().0)) - } - - async fn handshake_outgoing( - stream: S, - _secret_key: MockSecretKey, - _public_key: ::PublicKey, - ) -> Result<(S::Sender, S::Receiver), HandshakeError<::PublicKey>> - { - let (sender, receiver) = stream.split(); - Ok((sender, receiver)) - } -} - -pub struct WrappingReader { - action: A, - reader: R, -} - -impl WrappingReader { - pub fn new_with_closure(reader: R, closure: A) -> Self { - Self { - action: closure, - reader, - } - } -} - -impl AsyncRead for WrappingReader { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let self_mut = self.get_mut(); - (self_mut.action)(); - Pin::new(&mut self_mut.reader).poll_read(cx, buf) - } -} - -impl ConnectionInfo for WrappingReader { - fn peer_address_info(&self) -> crate::network::clique::PeerAddressInfo { - String::from("WRAPPING_READER") - } -} - -pub struct IteratorWrapper(I); - -impl IteratorWrapper { - pub fn new(iterator: I) -> Self { - Self(iterator) - } -} - -impl + Unpin> AsyncRead for IteratorWrapper { - fn poll_read( - self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let iter = &mut self.get_mut().0; - let buffer = buf.initialize_unfilled(); - let remaining = buffer.len(); - for cell in buffer.iter_mut() { - match iter.next() { - Some(next) => *cell = next, - None => { - return Poll::Pending; - } - } - } - buf.advance(remaining); - Poll::Ready(Result::Ok(())) - } -} - -pub struct WrappingWriter { - action: A, - writer: W, -} - -impl WrappingWriter { - pub fn new_with_closure(writer: W, action: A) -> Self { - Self { action, writer } - } -} - -impl AsyncWrite for WrappingWriter { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let self_mut = self.get_mut(); - (self_mut.action)(); - AsyncWrite::poll_write(Pin::new(&mut self_mut.writer), cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut self.get_mut().writer), cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - AsyncWrite::poll_shutdown(Pin::new(&mut self.get_mut().writer), cx) - } -} - -impl ConnectionInfo for WrappingWriter { - fn peer_address_info(&self) -> crate::network::clique::PeerAddressInfo { - String::from("WRAPPING_WRITER") - } + pub authorization_handler: AuthorizationRequestHandler, } diff --git a/finality-aleph/src/network/clique/protocols/handshake.rs b/finality-aleph/src/network/clique/protocols/handshake.rs index 2125718189..e7720a8880 100644 --- a/finality-aleph/src/network/clique/protocols/handshake.rs +++ b/finality-aleph/src/network/clique/protocols/handshake.rs @@ -180,40 +180,6 @@ pub async fn v0_handshake_outgoing( .map_err(|_| HandshakeError::TimedOut)? } -#[async_trait::async_trait] -pub trait Handshake { - async fn handshake_incoming( - stream: S, - secret_key: SK, - ) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError>; - - async fn handshake_outgoing( - stream: S, - secret_key: SK, - public_key: SK::PublicKey, - ) -> Result<(S::Sender, S::Receiver), HandshakeError>; -} - -pub struct DefaultHandshake; - -#[async_trait::async_trait] -impl Handshake for DefaultHandshake { - async fn handshake_incoming( - stream: S, - secret_key: SK, - ) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> { - v0_handshake_incoming(stream, secret_key).await - } - - async fn handshake_outgoing( - stream: S, - secret_key: SK, - public_key: SK::PublicKey, - ) -> Result<(S::Sender, S::Receiver), HandshakeError> { - v0_handshake_outgoing(stream, secret_key, public_key).await - } -} - #[cfg(test)] mod tests { use futures::{join, try_join}; diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index 1c660c618b..2854088095 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -13,7 +13,7 @@ mod negotiation; mod v0; mod v1; -pub use handshake::{Handshake, HandshakeError}; +pub use handshake::HandshakeError; pub use negotiation::{protocol, ProtocolNegotiationError}; pub use v0::handle_authorization; diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 20c0c95e22..4ed808b21c 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -5,7 +5,10 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::network::clique::{ authorization::{Authorization, AuthorizatorError}, io::{receive_data, send_data}, - protocols::{ConnectionType, ProtocolError, ResultForService}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, + }, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, }; @@ -13,8 +16,6 @@ mod heartbeat; use heartbeat::{heartbeat_receiver, heartbeat_sender}; -use super::handshake::{DefaultHandshake, Handshake}; - /// Receives data from the parent service and sends it over the network. /// Exits when the parent channel is closed, or if the network connection is broken. async fn sending( @@ -37,19 +38,9 @@ pub async fn outgoing( secret_key: SK, public_key: SK::PublicKey, result_for_parent: mpsc::UnboundedSender>, -) -> Result<(), ProtocolError> { - handle_outgoing::<_, _, _, DefaultHandshake>(stream, secret_key, public_key, result_for_parent) - .await -} - -pub async fn handle_outgoing>( - stream: S, - secret_key: SK, - public_key: SK::PublicKey, - result_for_parent: mpsc::UnboundedSender>, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Extending hand to {}.", public_key); - let (sender, receiver) = H::handshake_outgoing(stream, secret_key, public_key.clone()).await?; + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; info!( target: LOG_TARGET, "Outgoing handshake with {} finished successfully.", public_key @@ -101,32 +92,9 @@ pub async fn incoming>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { - handle_incoming::<_, _, _, _, DefaultHandshake>( - stream, - secret_key, - authorizator, - result_for_parent, - data_for_user, - ) - .await -} - -pub async fn handle_incoming< - SK: SecretKey, - D: Data, - S: Splittable, - A: Authorization, - H: Handshake, ->( - stream: S, - secret_key: SK, - authorizator: A, - result_for_parent: mpsc::UnboundedSender>, - data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Waiting for extended hand..."); - let (sender, receiver, public_key) = H::handshake_incoming(stream, secret_key).await?; + let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; info!( target: LOG_TARGET, "Incoming handshake with {} finished successfully.", public_key @@ -196,20 +164,15 @@ pub async fn handle_authorization #[cfg(test)] pub mod tests { use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; - use parking_lot::Mutex; - use super::handle_incoming; use crate::network::clique::{ - authorization::Authorization, - mock::{ - key, new_authorizer, IteratorWrapper, MockAuthorizer, MockPrelims, MockSplittable, - MockWrappedSplittable, NoHandshake, WrappingReader, WrappingWriter, - }, + authorization::{AuthorizationResult, Authorizator}, + mock::{key, MockPrelims, MockSplittable}, protocols::{ v0::{incoming, outgoing}, - ConnectionType, Handshake, ProtocolError, ResultForService, + ConnectionType, ProtocolError, }, - Data, SecretKey, Splittable, + Data, }; fn prepare() -> MockPrelims { @@ -220,10 +183,11 @@ pub mod tests { let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (data_for_user, data_from_incoming) = mpsc::unbounded::(); + let (authorizator, authorization_handler) = Authorizator::new(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), - new_authorizer(), + authorizator, incoming_result_for_service, data_for_user, )); @@ -244,6 +208,7 @@ pub mod tests { data_from_outgoing: None, result_from_incoming, result_from_outgoing, + authorization_handler, } } @@ -501,89 +466,35 @@ pub mod tests { }; } - #[async_trait::async_trait] - pub trait HandleIncoming { - async fn handle_incoming( - stream: S, - secret_key: SK, - authorization: A, - result_for_parent: mpsc::UnboundedSender>, - data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> - where - SK: SecretKey, - D: Data, - S: Splittable, - A: Authorization + Send + Sync, - H: Handshake; - } + #[tokio::test] + async fn do_not_call_sender_and_receiver_until_authorized() { + let MockPrelims { + incoming_handle, + outgoing_handle, + mut data_from_incoming, + mut result_from_incoming, + mut authorization_handler, + .. + } = prepare::>(); - pub async fn execute_do_not_call_sender_and_receiver_until_authorized() { - let writer = WrappingWriter::new_with_closure(Vec::new(), move || { - panic!("Writer should not be called."); - }); - let reader = WrappingReader::new_with_closure( - IteratorWrapper::new([0].into_iter().cycle()), - move || { - panic!("Reader should not be called."); - }, - ); - let stream = MockWrappedSplittable::new(reader, writer); - let (result_for_parent, _) = mpsc::unbounded(); - let (data_for_user, _) = mpsc::unbounded::>(); - - let authorizer_called = Mutex::new(false); - let authorizer = MockAuthorizer::new_with_closure(|_| { - *authorizer_called.lock() = true; - false - }); - let (_, secret_key) = key(); - - // It should exit immediately after we reject authorization. - // `NoHandshake` mocks the real handshake procedure and it does not call reader nor writer. - let result = H::handle_incoming::<_, _, _, _, NoHandshake>( - stream, - secret_key, - authorizer, - result_for_parent, - data_for_user, - ) - .await; - assert!(result.is_ok()); - assert!(*authorizer_called.lock()); - } + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + let authorization_handle = authorization_handler + .handle_authorization(|_| AuthorizationResult::NotAuthorized) + .fuse(); - struct V0HandleIncoming; - - #[async_trait::async_trait] - impl HandleIncoming for V0HandleIncoming { - async fn handle_incoming( - stream: S, - secret_key: SK, - authorization: A, - result_for_parent: mpsc::UnboundedSender>, - data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> - where - SK: SecretKey, - D: Data, - S: Splittable, - A: Authorization + Send + Sync, - H: Handshake, - { - handle_incoming::<_, _, _, _, H>( - stream, - secret_key, - authorization, - result_for_parent, - data_for_user, - ) - .await - } - } + // since we are returning `NotAuthorized` all three should finish hapilly + let (incoming_result, outgoing_result, authorization_result) = + tokio::join!(incoming_handle, outgoing_handle, authorization_handle); - #[tokio::test] - async fn do_not_call_sender_and_receiver_until_authorized() { - execute_do_not_call_sender_and_receiver_until_authorized::().await + assert!(incoming_result.is_ok()); + assert!(outgoing_result.is_err()); + assert!(authorization_result.is_ok()); + + let data_from_incoming = data_from_incoming.try_next(); + assert!(data_from_incoming.ok().flatten().is_none()); + + let result_from_incoming = result_from_incoming.try_next(); + assert!(result_from_incoming.ok().flatten().is_none()); } } diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 8778c2462f..75d6353064 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -11,7 +11,7 @@ use crate::network::clique::{ io::{receive_data, send_data}, protocols::{ handle_authorization, - handshake::{DefaultHandshake, Handshake}, + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, ConnectionType, ProtocolError, ResultForService, }, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, @@ -93,26 +93,9 @@ pub async fn outgoing( public_key: SK::PublicKey, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { - handle_outgoing::<_, _, _, DefaultHandshake>( - stream, - secret_key, - public_key, - result_for_parent, - data_for_user, - ) - .await -} - -pub async fn handle_outgoing>( - stream: S, - secret_key: SK, - public_key: SK::PublicKey, - result_for_parent: mpsc::UnboundedSender>, - data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Extending hand to {}.", public_key); - let (sender, receiver) = H::handshake_outgoing(stream, secret_key, public_key.clone()).await?; + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; info!( target: LOG_TARGET, "Outgoing handshake with {} finished successfully.", public_key @@ -142,32 +125,9 @@ pub async fn incoming>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { - handle_incoming::<_, _, _, _, DefaultHandshake>( - stream, - secret_key, - authorizator, - result_for_parent, - data_for_user, - ) - .await -} - -pub async fn handle_incoming< - SK: SecretKey, - D: Data, - S: Splittable, - A: Authorization, - H: Handshake, ->( - stream: S, - secret_key: SK, - authorizator: A, - result_for_parent: mpsc::UnboundedSender>, - data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: LOG_TARGET, "Waiting for extended hand..."); - let (sender, receiver, public_key) = H::handshake_incoming(stream, secret_key).await?; + let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; info!( target: LOG_TARGET, "Incoming handshake with {} finished successfully.", public_key @@ -204,14 +164,13 @@ mod tests { use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; use crate::network::clique::{ - authorization::Authorization, - mock::{key, new_authorizer, MockPrelims, MockSplittable}, + authorization::{AuthorizationResult, Authorizator}, + mock::{key, MockPrelims, MockSplittable}, protocols::{ - v0::tests::{execute_do_not_call_sender_and_receiver_until_authorized, HandleIncoming}, - v1::{handle_incoming, incoming, outgoing}, - ConnectionType, Handshake, ProtocolError, ResultForService, + v1::{incoming, outgoing}, + ConnectionType, ProtocolError, }, - Data, SecretKey, Splittable, + Data, }; fn prepare() -> MockPrelims { @@ -223,7 +182,7 @@ mod tests { let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (incoming_data_for_user, data_from_incoming) = mpsc::unbounded::(); let (outgoing_data_for_user, data_from_outgoing) = mpsc::unbounded::(); - let authorizer = new_authorizer(); + let (authorizer, authorization_handler) = Authorizator::new(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), @@ -249,6 +208,7 @@ mod tests { data_from_outgoing: Some(data_from_outgoing), result_from_incoming, result_from_outgoing, + authorization_handler, } } @@ -491,37 +451,35 @@ mod tests { }; } - struct V1HandleIncoming; - - #[async_trait::async_trait] - impl HandleIncoming for V1HandleIncoming { - async fn handle_incoming( - stream: S, - secret_key: SK, - authorization: A, - result_for_parent: mpsc::UnboundedSender>, - data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> - where - SK: SecretKey, - D: Data, - S: Splittable, - A: Authorization + Send + Sync, - H: Handshake, - { - handle_incoming::<_, _, _, _, H>( - stream, - secret_key, - authorization, - result_for_parent, - data_for_user, - ) - .await - } - } - #[tokio::test] async fn do_not_call_sender_and_receiver_until_authorized() { - execute_do_not_call_sender_and_receiver_until_authorized::().await + let MockPrelims { + incoming_handle, + outgoing_handle, + mut data_from_incoming, + mut result_from_incoming, + mut authorization_handler, + .. + } = prepare::>(); + + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + let authorization_handle = authorization_handler + .handle_authorization(|_| AuthorizationResult::NotAuthorized) + .fuse(); + + // since we are returning `NotAuthorized` all three should finish hapilly + let (incoming_result, outgoing_result, authorization_result) = + tokio::join!(incoming_handle, outgoing_handle, authorization_handle); + + assert!(incoming_result.is_ok()); + assert!(outgoing_result.is_err()); + assert!(authorization_result.is_ok()); + + let data_from_incoming = data_from_incoming.try_next(); + assert!(data_from_incoming.ok().flatten().is_none()); + + let result_from_incoming = result_from_incoming.try_next(); + assert!(result_from_incoming.ok().flatten().is_none()); } } From a0cd14c4dbbbf400318f4b6ad1a78799c8caa4b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Tue, 31 Jan 2023 20:36:12 +0100 Subject: [PATCH 08/14] cleaned tests for authorization --- finality-aleph/src/network/clique/protocols/mod.rs | 2 +- finality-aleph/src/network/clique/protocols/v0/mod.rs | 2 +- finality-aleph/src/network/clique/protocols/v1/mod.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index 2854088095..2b0852b45a 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -13,7 +13,7 @@ mod negotiation; mod v0; mod v1; -pub use handshake::HandshakeError; +use handshake::HandshakeError; pub use negotiation::{protocol, ProtocolNegotiationError}; pub use v0::handle_authorization; diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 4ed808b21c..6564d7a6c8 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -483,7 +483,7 @@ pub mod tests { .handle_authorization(|_| AuthorizationResult::NotAuthorized) .fuse(); - // since we are returning `NotAuthorized` all three should finish hapilly + // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = tokio::join!(incoming_handle, outgoing_handle, authorization_handle); diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 75d6353064..a53e48b377 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -468,7 +468,7 @@ mod tests { .handle_authorization(|_| AuthorizationResult::NotAuthorized) .fuse(); - // since we are returning `NotAuthorized` all three should finish hapilly + // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = tokio::join!(incoming_handle, outgoing_handle, authorization_handle); From cf289d3ba90ecab5064107bc8a47f36d86d104b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Tue, 31 Jan 2023 21:11:04 +0100 Subject: [PATCH 09/14] removed the Authorization api - switched bare channels --- .../src/network/clique/authorization.rs | 165 ------------------ finality-aleph/src/network/clique/incoming.rs | 7 +- finality-aleph/src/network/clique/mock.rs | 3 +- finality-aleph/src/network/clique/mod.rs | 1 - .../src/network/clique/protocols/mod.rs | 5 +- .../src/network/clique/protocols/v0/mod.rs | 64 ++++--- .../src/network/clique/protocols/v1/mod.rs | 29 +-- finality-aleph/src/network/clique/service.rs | 16 +- 8 files changed, 67 insertions(+), 223 deletions(-) delete mode 100644 finality-aleph/src/network/clique/authorization.rs diff --git a/finality-aleph/src/network/clique/authorization.rs b/finality-aleph/src/network/clique/authorization.rs deleted file mode 100644 index b3787c692c..0000000000 --- a/finality-aleph/src/network/clique/authorization.rs +++ /dev/null @@ -1,165 +0,0 @@ -use futures::{ - channel::{mpsc, oneshot}, - StreamExt, -}; - -#[derive(Debug, PartialEq, Eq)] -pub enum AuthorizatorError { - MissingService, - ServiceDisappeared, -} - -pub enum AuthorizationResult { - Authorized, - NotAuthorized, -} - -/// Allows one to authorize incoming public-keys. -#[async_trait::async_trait] -pub trait Authorization { - async fn is_authorized(&self, value: PK) -> Result; -} - -/// Used for validation of authorization requests. One should call [handle_authorization](Self::handle_authorization) and -/// provide a callback responsible for authorization. Each such call should be matched with call to -/// [Authorizator::is_authorized](Authorizator::is_authorized). -pub struct AuthorizationRequestHandler { - receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, -} - -impl AuthorizationRequestHandler { - fn new(receiver: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>) -> Self { - Self { receiver } - } - - pub async fn handle_authorization AuthorizationResult>( - &mut self, - handler: F, - ) -> Result<(), AuthorizatorError> { - let (identifier, result_sender) = self - .receiver - .next() - .await - .ok_or(AuthorizatorError::MissingService)?; - - let auth_result = handler(identifier); - result_sender - .send(auth_result) - .map_err(|_| AuthorizatorError::MissingService)?; - Ok(()) - } -} - -#[derive(Clone)] -pub struct Authorizator { - sender: mpsc::UnboundedSender<(PK, oneshot::Sender)>, -} - -/// `Authorizator` is responsible for authorization of public-keys for the validator-network component. Each call to -/// [is_authorized](Authorizator::is_authorized) should be followed by a call of -/// [handle_authorization](AuthorizationHandler::handle_authorization). -impl Authorizator { - pub fn new() -> (Self, AuthorizationRequestHandler) { - let (sender, receiver) = mpsc::unbounded(); - (Self { sender }, AuthorizationRequestHandler::new(receiver)) - } -} - -#[async_trait::async_trait] -impl Authorization for Authorizator { - async fn is_authorized(&self, value: PK) -> Result { - let (sender, receiver) = oneshot::channel(); - self.sender - .unbounded_send((value, sender)) - .map_err(|_| AuthorizatorError::MissingService)?; - receiver - .await - .map_err(|_| AuthorizatorError::ServiceDisappeared) - .map(|auth_result| match auth_result { - AuthorizationResult::Authorized => true, - AuthorizationResult::NotAuthorized => false, - }) - } -} - -#[cfg(test)] -mod tests { - use futures::join; - - use crate::network::clique::{ - authorization::{Authorization, AuthorizationResult, Authorizator, AuthorizatorError}, - mock::{key, MockSecretKey}, - SecretKey, - }; - - #[tokio::test] - async fn authorization_sanity_check() { - let (authorizator, mut request_handler) = - Authorizator::<::PublicKey>::new(); - let public_key = key().0; - let (authorizator_result, request_handler_result) = join!( - authorizator.is_authorized(public_key.clone()), - request_handler.handle_authorization(|_| AuthorizationResult::Authorized), - ); - - assert!(authorizator_result.expect("Authorizator should return Ok.")); - assert!( - request_handler_result.is_ok(), - "Request handler should return Ok." - ); - - let (authorizator_result, request_handler_result) = join!( - authorizator.is_authorized(public_key), - request_handler.handle_authorization(|_| AuthorizationResult::NotAuthorized), - ); - - assert!(!authorizator_result.expect("Authorizator should return Ok.")); - assert!( - request_handler_result.is_ok(), - "Request handler should return Ok." - ); - } - - #[tokio::test] - async fn authorizator_returns_error_when_handler_is_dropped() { - let (authorizator, request_handler) = - Authorizator::<::PublicKey>::new(); - let public_key = key().0; - drop(request_handler); - let result = authorizator.is_authorized(public_key.clone()).await; - - assert_eq!(result, Err(AuthorizatorError::MissingService)) - } - - #[tokio::test] - async fn authorizator_returns_error_when_handler_disappeared() { - let (authorizator, mut request_handler) = - Authorizator::<::PublicKey>::new(); - let public_key = key().0; - let (authorizator_result, _) = join!( - authorizator.is_authorized(public_key.clone()), - tokio::spawn(async move { - request_handler - .handle_authorization(|_| panic!("handler bye bye")) - .await - }), - ); - - assert_eq!( - authorizator_result, - Err(AuthorizatorError::ServiceDisappeared) - ) - } - - #[tokio::test] - async fn authorization_request_handler_returns_error_when_all_authorizators_are_missing() { - let (authorizator, mut request_handler) = - Authorizator::<::PublicKey>::new(); - drop(authorizator); - let result = request_handler - .handle_authorization(|_| AuthorizationResult::Authorized) - .await; - - assert_eq!(result, Err(AuthorizatorError::MissingService)) - } -} diff --git a/finality-aleph/src/network/clique/incoming.rs b/finality-aleph/src/network/clique/incoming.rs index 578856f647..9b7979dc2f 100644 --- a/finality-aleph/src/network/clique/incoming.rs +++ b/finality-aleph/src/network/clique/incoming.rs @@ -1,10 +1,9 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use futures::channel::mpsc; +use futures::channel::{mpsc, oneshot}; use log::{debug, info}; use crate::network::clique::{ - authorization::Authorizator, protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, }; @@ -41,7 +40,7 @@ async fn manage_incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - authorizator: Authorizator, + authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) -> Result<(), IncomingError> { debug!( target: LOG_TARGET, @@ -70,7 +69,7 @@ pub async fn incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - authorizator: Authorizator, + authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) { let addr = stream.peer_address_info(); if let Err(e) = manage_incoming( diff --git a/finality-aleph/src/network/clique/mock.rs b/finality-aleph/src/network/clique/mock.rs index ffbad85a44..30ddf53396 100644 --- a/finality-aleph/src/network/clique/mock.rs +++ b/finality-aleph/src/network/clique/mock.rs @@ -15,7 +15,6 @@ use log::info; use rand::Rng; use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf}; -use super::authorization::AuthorizationRequestHandler; use crate::network::{ clique::{ protocols::{ProtocolError, ResultForService}, @@ -555,5 +554,5 @@ pub struct MockPrelims { pub data_from_outgoing: Option>, pub result_from_incoming: UnboundedReceiver>, pub result_from_outgoing: UnboundedReceiver>, - pub authorization_handler: AuthorizationRequestHandler, + pub authorization_handler: mpsc::UnboundedReceiver<(MockPublicKey, oneshot::Sender)>, } diff --git a/finality-aleph/src/network/clique/mod.rs b/finality-aleph/src/network/clique/mod.rs index d40b64cc77..02e30db490 100644 --- a/finality-aleph/src/network/clique/mod.rs +++ b/finality-aleph/src/network/clique/mod.rs @@ -5,7 +5,6 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::network::Data; -mod authorization; mod crypto; mod incoming; mod io; diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index 2b0852b45a..d965666409 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -1,9 +1,8 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use futures::channel::mpsc; +use futures::channel::{mpsc, oneshot}; use crate::network::clique::{ - authorization::Authorizator, io::{ReceiveError, SendError}, Data, PublicKey, SecretKey, Splittable, }; @@ -110,7 +109,7 @@ impl Protocol { secret_key: SK, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - authorizator: Authorizator, + authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) -> Result<(), ProtocolError> { use Protocol::*; match self { diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 6564d7a6c8..44deed6940 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -1,9 +1,11 @@ -use futures::{channel::mpsc, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; use log::{debug, info, trace, warn}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::network::clique::{ - authorization::{Authorization, AuthorizatorError}, io::{receive_data, send_data}, protocols::{ handshake::{v0_handshake_incoming, v0_handshake_outgoing}, @@ -86,10 +88,10 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn incoming>( +pub async fn incoming( stream: S, secret_key: SK, - authorizator: A, + authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { @@ -100,7 +102,7 @@ pub async fn incoming(authorizator, public_key.clone()) + let authorized = handle_authorization::(authorizator, public_key.clone()) .await .map_err(|_| ProtocolError::NotAuthorized)?; if !authorized { @@ -136,26 +138,27 @@ pub async fn incoming>( - authorizator: A, +pub async fn handle_authorization( + authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, public_key: SK::PublicKey, ) -> Result { - let authorization_result = authorizator.is_authorized(public_key.clone()).await; - match authorization_result { + let (sender, receiver) = oneshot::channel(); + if let Err(err) = authorizator.unbounded_send((public_key.clone(), sender)) { + warn!( + target: LOG_TARGET, + "Unable to send authorization request for public-key {}: disconnected.", + err.into_inner().0, + ); + return Err(()); + } + match receiver.await { Ok(result) => Ok(result), - Err(error) => { - match error { - AuthorizatorError::MissingService => warn!( - target: LOG_TARGET, - "Authorization service for public_key={} went missing before we called it.", - public_key - ), - AuthorizatorError::ServiceDisappeared => warn!( - target: LOG_TARGET, - "We managed to send authorization request for public_key={}, but were unable to receive an answer.", - public_key - ), - }; + Err(_) => { + warn!( + target: LOG_TARGET, + "Unable to receive an answer to authorization request for public-key {}: disconnected.", + public_key, + ); Err(()) } } @@ -166,7 +169,6 @@ pub mod tests { use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; use crate::network::clique::{ - authorization::{AuthorizationResult, Authorizator}, mock::{key, MockPrelims, MockSplittable}, protocols::{ v0::{incoming, outgoing}, @@ -183,7 +185,7 @@ pub mod tests { let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (data_for_user, data_from_incoming) = mpsc::unbounded::(); - let (authorizator, authorization_handler) = Authorizator::new(); + let (authorizator, authorization_handler) = mpsc::unbounded(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), @@ -479,9 +481,17 @@ pub mod tests { let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); - let authorization_handle = authorization_handler - .handle_authorization(|_| AuthorizationResult::NotAuthorized) - .fuse(); + let authorization_handle = async move { + let (_, response_sender) = authorization_handler + .next() + .await + .expect("We should recieve at least one authorization request."); + response_sender + .send(false) + .expect("We should be able to send back an authorization response."); + Result::<(), ()>::Ok(()) + } + .fuse(); // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index a53e48b377..41c0a45e93 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -1,5 +1,8 @@ use codec::{Decode, Encode}; -use futures::{channel::mpsc, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; use log::{debug, info, trace, warn}; use tokio::{ io::{AsyncRead, AsyncWrite}, @@ -7,7 +10,6 @@ use tokio::{ }; use crate::network::clique::{ - authorization::Authorization, io::{receive_data, send_data}, protocols::{ handle_authorization, @@ -119,10 +121,10 @@ pub async fn outgoing( /// Performs the incoming handshake, and then manages a connection sending and receiving data. /// Exits on parent request (when the data source is dropped), or in case of broken or dead /// network connection. -pub async fn incoming>( +pub async fn incoming( stream: S, secret_key: SK, - authorizator: A, + authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { @@ -133,7 +135,7 @@ pub async fn incoming(authorizator, public_key.clone()) + let authorized = handle_authorization::(authorizator, public_key.clone()) .await .map_err(|_| ProtocolError::NotAuthorized)?; if !authorized { @@ -164,7 +166,6 @@ mod tests { use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; use crate::network::clique::{ - authorization::{AuthorizationResult, Authorizator}, mock::{key, MockPrelims, MockSplittable}, protocols::{ v1::{incoming, outgoing}, @@ -182,7 +183,7 @@ mod tests { let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (incoming_data_for_user, data_from_incoming) = mpsc::unbounded::(); let (outgoing_data_for_user, data_from_outgoing) = mpsc::unbounded::(); - let (authorizer, authorization_handler) = Authorizator::new(); + let (authorizer, authorization_handler) = mpsc::unbounded(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), @@ -464,9 +465,17 @@ mod tests { let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); - let authorization_handle = authorization_handler - .handle_authorization(|_| AuthorizationResult::NotAuthorized) - .fuse(); + let authorization_handle = async move { + let (_, response_sender) = authorization_handler + .next() + .await + .expect("We should recieve at least one authorization request."); + response_sender + .send(false) + .expect("We should be able to send back an authorization response."); + Result::<(), ()>::Ok(()) + } + .fuse(); // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = diff --git a/finality-aleph/src/network/clique/service.rs b/finality-aleph/src/network/clique/service.rs index 9ca09c0f99..a6b2cd1b08 100644 --- a/finality-aleph/src/network/clique/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -10,7 +10,6 @@ use tokio::time; use crate::{ network::{ clique::{ - authorization::{AuthorizationResult, Authorizator}, incoming::incoming, manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, @@ -153,7 +152,7 @@ where &self, stream: NL::Connection, result_for_parent: mpsc::UnboundedSender>, - authorizator: Authorizator, + authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) { let secret_key = self.secret_key.clone(); let next_to_interface = self.next_to_interface.clone(); @@ -242,7 +241,7 @@ where pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); let (result_for_parent, mut worker_results) = mpsc::unbounded(); - let (authorizator, mut authorization_handler) = Authorizator::new(); + let (authorizator, mut authorization_handler) = mpsc::unbounded(); use ServiceCommand::*; loop { let listener = &mut self.listener; @@ -287,14 +286,9 @@ where } }, }, - result = authorization_handler.handle_authorization(|public_key| { - if self.manager.is_authorized(&public_key) { - AuthorizationResult::Authorized - } else { - AuthorizationResult::NotAuthorized - } - }) => { - if result.is_err() { + Some((public_key, response_channel)) = authorization_handler.next() => { + let authorization_result = self.manager.is_authorized(&public_key); + if response_channel.send(authorization_result).is_err() { warn!(target: LOG_TARGET, "Other side of the Authorization Service is already closed."); } }, From 691dc29bd182fd6f45f1aeaef1746509b8f5d9bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Tue, 31 Jan 2023 21:19:21 +0100 Subject: [PATCH 10/14] refactored out names of the authorizator --- finality-aleph/src/network/clique/incoming.rs | 8 ++++---- finality-aleph/src/network/clique/mock.rs | 2 +- .../src/network/clique/protocols/mod.rs | 9 ++++++--- .../src/network/clique/protocols/v0/mod.rs | 18 +++++++++--------- .../src/network/clique/protocols/v1/mod.rs | 14 +++++++------- finality-aleph/src/network/clique/service.rs | 13 ++++++++----- 6 files changed, 35 insertions(+), 29 deletions(-) diff --git a/finality-aleph/src/network/clique/incoming.rs b/finality-aleph/src/network/clique/incoming.rs index 9b7979dc2f..b7fcaa0c0a 100644 --- a/finality-aleph/src/network/clique/incoming.rs +++ b/finality-aleph/src/network/clique/incoming.rs @@ -40,7 +40,7 @@ async fn manage_incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) -> Result<(), IncomingError> { debug!( target: LOG_TARGET, @@ -54,7 +54,7 @@ async fn manage_incoming( secret_key, result_for_parent, data_for_user, - authorizator, + authorization_requests_sender, ) .await?) } @@ -69,7 +69,7 @@ pub async fn incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) { let addr = stream.peer_address_info(); if let Err(e) = manage_incoming( @@ -77,7 +77,7 @@ pub async fn incoming( stream, result_for_parent, data_for_user, - authorizator, + authorization_requests_sender, ) .await { diff --git a/finality-aleph/src/network/clique/mock.rs b/finality-aleph/src/network/clique/mock.rs index 30ddf53396..1c95d37439 100644 --- a/finality-aleph/src/network/clique/mock.rs +++ b/finality-aleph/src/network/clique/mock.rs @@ -554,5 +554,5 @@ pub struct MockPrelims { pub data_from_outgoing: Option>, pub result_from_incoming: UnboundedReceiver>, pub result_from_outgoing: UnboundedReceiver>, - pub authorization_handler: mpsc::UnboundedReceiver<(MockPublicKey, oneshot::Sender)>, + pub authorization_requests: mpsc::UnboundedReceiver<(MockPublicKey, oneshot::Sender)>, } diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index d965666409..bf22755d11 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -109,7 +109,10 @@ impl Protocol { secret_key: SK, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + authorization_requests_sender: mpsc::UnboundedSender<( + SK::PublicKey, + oneshot::Sender, + )>, ) -> Result<(), ProtocolError> { use Protocol::*; match self { @@ -117,7 +120,7 @@ impl Protocol { v0::incoming( stream, secret_key, - authorizator, + authorization_requests_sender, result_for_parent, data_for_user, ) @@ -127,7 +130,7 @@ impl Protocol { v1::incoming( stream, secret_key, - authorizator, + authorization_requests_sender, result_for_parent, data_for_user, ) diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 44deed6940..5f418be0d4 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -91,7 +91,7 @@ async fn receiving( pub async fn incoming( stream: S, secret_key: SK, - authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { @@ -102,7 +102,7 @@ pub async fn incoming( "Incoming handshake with {} finished successfully.", public_key ); - let authorized = handle_authorization::(authorizator, public_key.clone()) + let authorized = handle_authorization::(authorization_requests_sender, public_key.clone()) .await .map_err(|_| ProtocolError::NotAuthorized)?; if !authorized { @@ -139,11 +139,11 @@ pub async fn incoming( } pub async fn handle_authorization( - authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, public_key: SK::PublicKey, ) -> Result { let (sender, receiver) = oneshot::channel(); - if let Err(err) = authorizator.unbounded_send((public_key.clone(), sender)) { + if let Err(err) = authorization_requests_sender.unbounded_send((public_key.clone(), sender)) { warn!( target: LOG_TARGET, "Unable to send authorization request for public-key {}: disconnected.", @@ -185,11 +185,11 @@ pub mod tests { let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (data_for_user, data_from_incoming) = mpsc::unbounded::(); - let (authorizator, authorization_handler) = mpsc::unbounded(); + let (authorization_requests_sender, authorization_requests) = mpsc::unbounded(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), - authorizator, + authorization_requests_sender, incoming_result_for_service, data_for_user, )); @@ -210,7 +210,7 @@ pub mod tests { data_from_outgoing: None, result_from_incoming, result_from_outgoing, - authorization_handler, + authorization_requests, } } @@ -475,14 +475,14 @@ pub mod tests { outgoing_handle, mut data_from_incoming, mut result_from_incoming, - mut authorization_handler, + mut authorization_requests, .. } = prepare::>(); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); let authorization_handle = async move { - let (_, response_sender) = authorization_handler + let (_, response_sender) = authorization_requests .next() .await .expect("We should recieve at least one authorization request."); diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 41c0a45e93..81fd039326 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -124,7 +124,7 @@ pub async fn outgoing( pub async fn incoming( stream: S, secret_key: SK, - authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { @@ -135,7 +135,7 @@ pub async fn incoming( "Incoming handshake with {} finished successfully.", public_key ); - let authorized = handle_authorization::(authorizator, public_key.clone()) + let authorized = handle_authorization::(authorization_requests_sender, public_key.clone()) .await .map_err(|_| ProtocolError::NotAuthorized)?; if !authorized { @@ -183,11 +183,11 @@ mod tests { let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (incoming_data_for_user, data_from_incoming) = mpsc::unbounded::(); let (outgoing_data_for_user, data_from_outgoing) = mpsc::unbounded::(); - let (authorizer, authorization_handler) = mpsc::unbounded(); + let (authorization_requests_sender, authorization_requests) = mpsc::unbounded(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), - authorizer, + authorization_requests_sender, incoming_result_for_service, incoming_data_for_user, )); @@ -209,7 +209,7 @@ mod tests { data_from_outgoing: Some(data_from_outgoing), result_from_incoming, result_from_outgoing, - authorization_handler, + authorization_requests, } } @@ -459,14 +459,14 @@ mod tests { outgoing_handle, mut data_from_incoming, mut result_from_incoming, - mut authorization_handler, + mut authorization_requests, .. } = prepare::>(); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); let authorization_handle = async move { - let (_, response_sender) = authorization_handler + let (_, response_sender) = authorization_requests .next() .await .expect("We should recieve at least one authorization request."); diff --git a/finality-aleph/src/network/clique/service.rs b/finality-aleph/src/network/clique/service.rs index a6b2cd1b08..609651bdb3 100644 --- a/finality-aleph/src/network/clique/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -152,7 +152,10 @@ where &self, stream: NL::Connection, result_for_parent: mpsc::UnboundedSender>, - authorizator: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + authorization_requests_sender: mpsc::UnboundedSender<( + SK::PublicKey, + oneshot::Sender, + )>, ) { let secret_key = self.secret_key.clone(); let next_to_interface = self.next_to_interface.clone(); @@ -163,7 +166,7 @@ where stream, result_for_parent, next_to_interface, - authorizator, + authorization_requests_sender, ) .await; }); @@ -241,14 +244,14 @@ where pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); let (result_for_parent, mut worker_results) = mpsc::unbounded(); - let (authorizator, mut authorization_handler) = mpsc::unbounded(); + let (authorization_requests_sender, mut authorization_requests) = mpsc::unbounded(); use ServiceCommand::*; loop { let listener = &mut self.listener; tokio::select! { // got new incoming connection from the listener - spawn an incoming worker maybe_stream = listener.accept() => match maybe_stream { - Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone(), authorizator.clone()), + Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone(), authorization_requests_sender.clone()), Err(e) => warn!(target: LOG_TARGET, "Listener failed to accept connection: {}", e), }, // got a new command from the interface @@ -286,7 +289,7 @@ where } }, }, - Some((public_key, response_channel)) = authorization_handler.next() => { + Some((public_key, response_channel)) = authorization_requests.next() => { let authorization_result = self.manager.is_authorized(&public_key); if response_channel.send(authorization_result).is_err() { warn!(target: LOG_TARGET, "Other side of the Authorization Service is already closed."); From fcb02a56d87f4adf135b4f80fa60d66fa8bb6155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Wed, 1 Feb 2023 14:21:28 +0100 Subject: [PATCH 11/14] fixed tests after refactoring the authorizator --- .../src/network/clique/protocols/v0/mod.rs | 70 +++++++++++++++---- .../src/network/clique/protocols/v1/mod.rs | 66 +++++++++++++---- 2 files changed, 110 insertions(+), 26 deletions(-) diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 5f418be0d4..787328d1c2 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -166,7 +166,10 @@ pub async fn handle_authorization( #[cfg(test)] pub mod tests { - use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; + use futures::{ + channel::{mpsc, oneshot}, + pin_mut, Future, FutureExt, StreamExt, + }; use crate::network::clique::{ mock::{key, MockPrelims, MockSplittable}, @@ -214,6 +217,39 @@ pub mod tests { } } + fn handle_authorization( + mut authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + handler: impl FnOnce(PK) -> bool + Send + 'static, + ) -> impl Future> { + tokio::spawn(async move { + let (public_key, response_sender) = authorization_requests + .next() + .await + .expect("We should recieve at least one authorization request."); + let authorization_result = handler(public_key); + response_sender + .send(authorization_result) + .expect("We should be able to send back an authorization response."); + Result::<(), ()>::Ok(()) + }) + .map(|result| match result { + Ok(ok) => ok, + Err(_) => Err(()), + }) + } + + fn all_pass_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| true) + } + + fn no_go_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| false) + } + #[tokio::test] async fn send_data() { let MockPrelims { @@ -222,8 +258,10 @@ pub mod tests { mut data_from_incoming, result_from_incoming: _result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); @@ -269,8 +307,10 @@ pub mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); @@ -298,8 +338,10 @@ pub mod tests { data_from_incoming: _data_from_incoming, result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(result_from_incoming); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); @@ -323,8 +365,10 @@ pub mod tests { data_from_incoming, result_from_incoming: _result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(data_from_incoming); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); @@ -361,8 +405,10 @@ pub mod tests { data_from_incoming: _data_from_incoming, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(outgoing_handle); match incoming_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -379,8 +425,10 @@ pub mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); let (_, _exit, connection_type) = tokio::select! { @@ -405,8 +453,10 @@ pub mod tests { data_from_incoming: _data_from_incoming, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(incoming_handle); match outgoing_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -423,8 +473,10 @@ pub mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); let (_, _exit, connection_type) = tokio::select! { @@ -451,8 +503,10 @@ pub mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); let (_, _exit, connection_type) = tokio::select! { @@ -475,23 +529,13 @@ pub mod tests { outgoing_handle, mut data_from_incoming, mut result_from_incoming, - mut authorization_requests, + authorization_requests, .. } = prepare::>(); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); - let authorization_handle = async move { - let (_, response_sender) = authorization_requests - .next() - .await - .expect("We should recieve at least one authorization request."); - response_sender - .send(false) - .expect("We should be able to send back an authorization response."); - Result::<(), ()>::Ok(()) - } - .fuse(); + let authorization_handle = no_go_authorization_handler(authorization_requests); // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 81fd039326..44b632ea4d 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -163,7 +163,10 @@ pub async fn incoming( #[cfg(test)] mod tests { - use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; + use futures::{ + channel::{mpsc, oneshot}, + pin_mut, Future, FutureExt, StreamExt, + }; use crate::network::clique::{ mock::{key, MockPrelims, MockSplittable}, @@ -213,6 +216,39 @@ mod tests { } } + fn handle_authorization( + mut authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + handler: impl FnOnce(PK) -> bool + Send + 'static, + ) -> impl Future> { + tokio::spawn(async move { + let (public_key, response_sender) = authorization_requests + .next() + .await + .expect("We should recieve at least one authorization request."); + let authorization_result = handler(public_key); + response_sender + .send(authorization_result) + .expect("We should be able to send back an authorization response."); + Result::<(), ()>::Ok(()) + }) + .map(|result| match result { + Ok(ok) => ok, + Err(_) => Err(()), + }) + } + + fn all_pass_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| true) + } + + fn no_go_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| false) + } + #[tokio::test] async fn send_data() { let MockPrelims { @@ -222,6 +258,7 @@ mod tests { data_from_outgoing, mut result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); let mut data_from_outgoing = data_from_outgoing.expect("No data from outgoing!"); @@ -229,6 +266,7 @@ mod tests { let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let _data_for_outgoing = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), @@ -301,12 +339,14 @@ mod tests { data_from_outgoing: _data_from_outgoing, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), @@ -331,6 +371,7 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); std::mem::drop(result_from_incoming); @@ -338,6 +379,7 @@ mod tests { let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); tokio::select! { e = &mut incoming_handle => match e { Err(ProtocolError::NoParentConnection) => (), @@ -357,6 +399,7 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming: _result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); std::mem::drop(data_from_incoming); @@ -364,6 +407,7 @@ mod tests { let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let _data_for_outgoing = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), @@ -396,8 +440,10 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(outgoing_handle); match incoming_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -415,8 +461,10 @@ mod tests { data_from_outgoing: _data_from_outgoing, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); let (_, _exit, connection_type) = tokio::select! { @@ -442,8 +490,10 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(incoming_handle); match outgoing_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -459,23 +509,13 @@ mod tests { outgoing_handle, mut data_from_incoming, mut result_from_incoming, - mut authorization_requests, + authorization_requests, .. } = prepare::>(); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); - let authorization_handle = async move { - let (_, response_sender) = authorization_requests - .next() - .await - .expect("We should recieve at least one authorization request."); - response_sender - .send(false) - .expect("We should be able to send back an authorization response."); - Result::<(), ()>::Ok(()) - } - .fuse(); + let authorization_handle = no_go_authorization_handler(authorization_requests); // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = From 65fec32a0827f1142d84e86a74d58a6fe27a412e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Thu, 2 Feb 2023 12:00:46 +0100 Subject: [PATCH 12/14] simplified `handle_authorization` in clique network --- .../src/network/clique/protocols/mod.rs | 1 - .../src/network/clique/protocols/v0/mod.rs | 42 +++++-------------- .../src/network/clique/protocols/v1/mod.rs | 15 ++----- 3 files changed, 15 insertions(+), 43 deletions(-) diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index bf22755d11..6379da5135 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -14,7 +14,6 @@ mod v1; use handshake::HandshakeError; pub use negotiation::{protocol, ProtocolNegotiationError}; -pub use v0::handle_authorization; pub type Version = u32; diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 787328d1c2..3470307caf 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -2,7 +2,7 @@ use futures::{ channel::{mpsc, oneshot}, StreamExt, }; -use log::{debug, info, trace, warn}; +use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::network::clique::{ @@ -102,15 +102,8 @@ pub async fn incoming( "Incoming handshake with {} finished successfully.", public_key ); - let authorized = handle_authorization::(authorization_requests_sender, public_key.clone()) - .await - .map_err(|_| ProtocolError::NotAuthorized)?; - if !authorized { - warn!( - target: LOG_TARGET, - "public_key={} was not authorized.", public_key - ); - return Ok(()); + if !check_authorization::(authorization_requests_sender, public_key.clone()).await? { + return Err(ProtocolError::NotAuthorized); } let (tx_exit, mut exit) = mpsc::unbounded(); @@ -138,30 +131,17 @@ pub async fn incoming( } } -pub async fn handle_authorization( +pub async fn check_authorization( authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, public_key: SK::PublicKey, -) -> Result { +) -> Result> { let (sender, receiver) = oneshot::channel(); - if let Err(err) = authorization_requests_sender.unbounded_send((public_key.clone(), sender)) { - warn!( - target: LOG_TARGET, - "Unable to send authorization request for public-key {}: disconnected.", - err.into_inner().0, - ); - return Err(()); - } - match receiver.await { - Ok(result) => Ok(result), - Err(_) => { - warn!( - target: LOG_TARGET, - "Unable to receive an answer to authorization request for public-key {}: disconnected.", - public_key, - ); - Err(()) - } - } + authorization_requests_sender + .unbounded_send((public_key.clone(), sender)) + .map_err(|_| ProtocolError::NoParentConnection)?; + receiver + .await + .map_err(|_| ProtocolError::NoParentConnection) } #[cfg(test)] diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 44b632ea4d..c909aad8b3 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -3,7 +3,7 @@ use futures::{ channel::{mpsc, oneshot}, StreamExt, }; -use log::{debug, info, trace, warn}; +use log::{debug, info, trace}; use tokio::{ io::{AsyncRead, AsyncWrite}, time::{timeout, Duration}, @@ -12,8 +12,8 @@ use tokio::{ use crate::network::clique::{ io::{receive_data, send_data}, protocols::{ - handle_authorization, handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + v0::check_authorization, ConnectionType, ProtocolError, ResultForService, }, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, @@ -135,15 +135,8 @@ pub async fn incoming( "Incoming handshake with {} finished successfully.", public_key ); - let authorized = handle_authorization::(authorization_requests_sender, public_key.clone()) - .await - .map_err(|_| ProtocolError::NotAuthorized)?; - if !authorized { - warn!( - target: LOG_TARGET, - "public_key={} was not authorized.", public_key - ); - return Ok(()); + if !check_authorization::(authorization_requests_sender, public_key.clone()).await? { + return Err(ProtocolError::NotAuthorized); } let (data_for_network, data_from_user) = mpsc::unbounded(); From 5f7ca130959481b8514360cbcbdfc892db2b6f75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Thu, 2 Feb 2023 12:03:36 +0100 Subject: [PATCH 13/14] review changes related with authorization in clique network --- finality-aleph/src/network/clique/protocols/mod.rs | 2 +- finality-aleph/src/network/clique/service.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index 6379da5135..7a35124606 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -71,7 +71,7 @@ impl Display for ProtocolError { CardiacArrest => write!(f, "heartbeat stopped"), NoParentConnection => write!(f, "cannot send result to service"), NoUserConnection => write!(f, "cannot send data to user"), - NotAuthorized => write!(f, "user not authorized"), + NotAuthorized => write!(f, "peer not authorized"), } } } diff --git a/finality-aleph/src/network/clique/service.rs b/finality-aleph/src/network/clique/service.rs index 609651bdb3..7905cc0b07 100644 --- a/finality-aleph/src/network/clique/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -247,10 +247,9 @@ where let (authorization_requests_sender, mut authorization_requests) = mpsc::unbounded(); use ServiceCommand::*; loop { - let listener = &mut self.listener; tokio::select! { // got new incoming connection from the listener - spawn an incoming worker - maybe_stream = listener.accept() => match maybe_stream { + maybe_stream = self.listener.accept() => match maybe_stream { Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone(), authorization_requests_sender.clone()), Err(e) => warn!(target: LOG_TARGET, "Listener failed to accept connection: {}", e), }, From 77fd69450849891783468b280961dadd7482b72f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Thu, 2 Feb 2023 12:15:22 +0100 Subject: [PATCH 14/14] cleaned tests fot the authorization in the clique network --- finality-aleph/src/network/clique/protocols/v0/mod.rs | 5 ++--- finality-aleph/src/network/clique/protocols/v1/mod.rs | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 3470307caf..e2c996efe4 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -513,16 +513,15 @@ pub mod tests { .. } = prepare::>(); - let incoming_handle = incoming_handle.fuse(); - let outgoing_handle = outgoing_handle.fuse(); let authorization_handle = no_go_authorization_handler(authorization_requests); // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = tokio::join!(incoming_handle, outgoing_handle, authorization_handle); - assert!(incoming_result.is_ok()); + assert!(incoming_result.is_err()); assert!(outgoing_result.is_err()); + // this also verifies if it was called at all assert!(authorization_result.is_ok()); let data_from_incoming = data_from_incoming.try_next(); diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index c909aad8b3..713c5b05e4 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -506,16 +506,15 @@ mod tests { .. } = prepare::>(); - let incoming_handle = incoming_handle.fuse(); - let outgoing_handle = outgoing_handle.fuse(); let authorization_handle = no_go_authorization_handler(authorization_requests); // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly let (incoming_result, outgoing_result, authorization_result) = tokio::join!(incoming_handle, outgoing_handle, authorization_handle); - assert!(incoming_result.is_ok()); + assert!(incoming_result.is_err()); assert!(outgoing_result.is_err()); + // this also verifies if it was called at all assert!(authorization_result.is_ok()); let data_from_incoming = data_from_incoming.try_next();