Skip to content

Commit

Permalink
A0-1897 validator-network authorization (#892)
Browse files Browse the repository at this point in the history
* new authorizator service for the validator network - no send/receive data before a user is authorized

* unit-tests for the authorization api for the validator-network

* fixed path in the script for synthetic-network

* simplified authorization impl for validator network

* authorization: lint

* more verbose types for the authorization api

* - removed mocks for AsyncWrite/Read used by the the clique authorization tests
- tests for authorization in clique-network use now `prepare` instead of mocking the handshake

* cleaned tests for authorization

* removed the Authorization api - switched bare channels

* refactored out names of the authorizator

* fixed tests after refactoring the authorizator

* simplified `handle_authorization` in clique network

* review changes related with authorization in clique network

* cleaned tests fot the authorization in the clique network
  • Loading branch information
fixxxedpoint authored Feb 2, 2023
1 parent f82550c commit cd1902d
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 20 deletions.
22 changes: 19 additions & 3 deletions finality-aleph/src/network/clique/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use log::{debug, info};

use crate::network::clique::{
Expand Down Expand Up @@ -40,6 +40,7 @@ async fn manage_incoming<SK: SecretKey, D: Data, S: Splittable>(
stream: S,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
) -> Result<(), IncomingError<SK::PublicKey>> {
debug!(
target: LOG_TARGET,
Expand All @@ -48,7 +49,13 @@ async fn manage_incoming<SK: SecretKey, D: Data, S: Splittable>(
let (stream, protocol) = protocol(stream).await?;
debug!(target: LOG_TARGET, "Negotiated protocol, running.");
Ok(protocol
.manage_incoming(stream, secret_key, result_for_parent, data_for_user)
.manage_incoming(
stream,
secret_key,
result_for_parent,
data_for_user,
authorization_requests_sender,
)
.await?)
}

Expand All @@ -62,9 +69,18 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
stream: S,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
) {
let addr = stream.peer_address_info();
if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await {
if let Err(e) = manage_incoming(
secret_key,
stream,
result_for_parent,
data_for_user,
authorization_requests_sender,
)
.await
{
info!(
target: LOG_TARGET,
"Incoming connection from {} failed: {}.", addr, e
Expand Down
4 changes: 4 additions & 0 deletions finality-aleph/src/network/clique/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
pub fn status_report(&self) -> impl Display {
ManagerStatus::new(self)
}

pub fn is_authorized(&self, public_key: &PK) -> bool {
self.wanted.interested(public_key)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions finality-aleph/src/network/clique/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,5 @@ pub struct MockPrelims<D> {
pub data_from_outgoing: Option<UnboundedReceiver<D>>,
pub result_from_incoming: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
pub result_from_outgoing: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
pub authorization_requests: mpsc::UnboundedReceiver<(MockPublicKey, oneshot::Sender<bool>)>,
}
33 changes: 29 additions & 4 deletions finality-aleph/src/network/clique/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};

use crate::network::clique::{
io::{ReceiveError, SendError},
Expand Down Expand Up @@ -57,6 +57,8 @@ pub enum ProtocolError<PK: PublicKey> {
NoParentConnection,
/// Data channel closed.
NoUserConnection,
/// Authorization error.
NotAuthorized,
}

impl<PK: PublicKey> Display for ProtocolError<PK> {
Expand All @@ -69,6 +71,7 @@ impl<PK: PublicKey> Display for ProtocolError<PK> {
CardiacArrest => write!(f, "heartbeat stopped"),
NoParentConnection => write!(f, "cannot send result to service"),
NoUserConnection => write!(f, "cannot send data to user"),
NotAuthorized => write!(f, "peer not authorized"),
}
}
}
Expand Down Expand Up @@ -103,13 +106,35 @@ impl Protocol {
&self,
stream: S,
secret_key: SK,
result_for_service: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(
SK::PublicKey,
oneshot::Sender<bool>,
)>,
) -> Result<(), ProtocolError<SK::PublicKey>> {
use Protocol::*;
match self {
V0 => v0::incoming(stream, secret_key, result_for_service, data_for_user).await,
V1 => v1::incoming(stream, secret_key, result_for_service, data_for_user).await,
V0 => {
v0::incoming(
stream,
secret_key,
authorization_requests_sender,
result_for_parent,
data_for_user,
)
.await
}
V1 => {
v1::incoming(
stream,
secret_key,
authorization_requests_sender,
result_for_parent,
data_for_user,
)
.await
}
}
}

Expand Down
119 changes: 114 additions & 5 deletions finality-aleph/src/network/clique/protocols/v0/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use futures::{channel::mpsc, StreamExt};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, info, trace};
use tokio::io::{AsyncRead, AsyncWrite};

Expand Down Expand Up @@ -88,6 +91,7 @@ async fn receiving<PK: PublicKey, D: Data, S: AsyncRead + Unpin + Send>(
pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
stream: S,
secret_key: SK,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), ProtocolError<SK::PublicKey>> {
Expand All @@ -98,6 +102,10 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
"Incoming handshake with {} finished successfully.", public_key
);

if !check_authorization::<SK>(authorization_requests_sender, public_key.clone()).await? {
return Err(ProtocolError::NotAuthorized);
}

let (tx_exit, mut exit) = mpsc::unbounded();
result_for_parent
.unbounded_send((
Expand All @@ -123,14 +131,32 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
}
}

pub async fn check_authorization<SK: SecretKey>(
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
public_key: SK::PublicKey,
) -> Result<bool, ProtocolError<SK::PublicKey>> {
let (sender, receiver) = oneshot::channel();
authorization_requests_sender
.unbounded_send((public_key.clone(), sender))
.map_err(|_| ProtocolError::NoParentConnection)?;
receiver
.await
.map_err(|_| ProtocolError::NoParentConnection)
}

#[cfg(test)]
mod tests {
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
pub mod tests {
use futures::{
channel::{mpsc, oneshot},
pin_mut, Future, FutureExt, StreamExt,
};

use super::{incoming, outgoing, ProtocolError};
use crate::network::clique::{
mock::{key, MockPrelims, MockSplittable},
protocols::ConnectionType,
protocols::{
v0::{incoming, outgoing},
ConnectionType, ProtocolError,
},
Data,
};

Expand All @@ -142,9 +168,11 @@ mod tests {
let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded();
let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded();
let (data_for_user, data_from_incoming) = mpsc::unbounded::<D>();
let (authorization_requests_sender, authorization_requests) = mpsc::unbounded();
let incoming_handle = Box::pin(incoming(
stream_incoming,
pen_incoming.clone(),
authorization_requests_sender,
incoming_result_for_service,
data_for_user,
));
Expand All @@ -165,9 +193,43 @@ mod tests {
data_from_outgoing: None,
result_from_incoming,
result_from_outgoing,
authorization_requests,
}
}

fn handle_authorization<PK: Send + 'static>(
mut authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender<bool>)>,
handler: impl FnOnce(PK) -> bool + Send + 'static,
) -> impl Future<Output = Result<(), ()>> {
tokio::spawn(async move {
let (public_key, response_sender) = authorization_requests
.next()
.await
.expect("We should recieve at least one authorization request.");
let authorization_result = handler(public_key);
response_sender
.send(authorization_result)
.expect("We should be able to send back an authorization response.");
Result::<(), ()>::Ok(())
})
.map(|result| match result {
Ok(ok) => ok,
Err(_) => Err(()),
})
}

fn all_pass_authorization_handler<PK: Send + 'static>(
authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender<bool>)>,
) -> impl Future<Output = Result<(), ()>> {
handle_authorization(authorization_requests, |_| true)
}

fn no_go_authorization_handler<PK: Send + 'static>(
authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender<bool>)>,
) -> impl Future<Output = Result<(), ()>> {
handle_authorization(authorization_requests, |_| false)
}

#[tokio::test]
async fn send_data() {
let MockPrelims {
Expand All @@ -176,8 +238,10 @@ mod tests {
mut data_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(incoming_handle);
Expand Down Expand Up @@ -223,8 +287,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(incoming_handle);
Expand Down Expand Up @@ -252,8 +318,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(result_from_incoming);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand All @@ -277,8 +345,10 @@ mod tests {
data_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(data_from_incoming);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand Down Expand Up @@ -315,8 +385,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(outgoing_handle);
match incoming_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -333,8 +405,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let incoming_handle = incoming_handle.fuse();
pin_mut!(incoming_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -359,8 +433,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(incoming_handle);
match outgoing_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -377,8 +453,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -405,8 +483,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -421,4 +501,33 @@ mod tests {
Ok(_) => panic!("successfully finished when connection dead"),
};
}

#[tokio::test]
async fn do_not_call_sender_and_receiver_until_authorized() {
let MockPrelims {
incoming_handle,
outgoing_handle,
mut data_from_incoming,
mut result_from_incoming,
authorization_requests,
..
} = prepare::<Vec<i32>>();

let authorization_handle = no_go_authorization_handler(authorization_requests);

// since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly
let (incoming_result, outgoing_result, authorization_result) =
tokio::join!(incoming_handle, outgoing_handle, authorization_handle);

assert!(incoming_result.is_err());
assert!(outgoing_result.is_err());
// this also verifies if it was called at all
assert!(authorization_result.is_ok());

let data_from_incoming = data_from_incoming.try_next();
assert!(data_from_incoming.ok().flatten().is_none());

let result_from_incoming = result_from_incoming.try_next();
assert!(result_from_incoming.ok().flatten().is_none());
}
}
Loading

0 comments on commit cd1902d

Please sign in to comment.