Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1897 validator-network authorization #892

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions finality-aleph/src/network/clique/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use log::{debug, info};

use crate::network::clique::{
Expand Down Expand Up @@ -40,6 +40,7 @@ async fn manage_incoming<SK: SecretKey, D: Data, S: Splittable>(
stream: S,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
) -> Result<(), IncomingError<SK::PublicKey>> {
debug!(
target: LOG_TARGET,
Expand All @@ -48,7 +49,13 @@ async fn manage_incoming<SK: SecretKey, D: Data, S: Splittable>(
let (stream, protocol) = protocol(stream).await?;
debug!(target: LOG_TARGET, "Negotiated protocol, running.");
Ok(protocol
.manage_incoming(stream, secret_key, result_for_parent, data_for_user)
.manage_incoming(
stream,
secret_key,
result_for_parent,
data_for_user,
authorization_requests_sender,
)
.await?)
}

Expand All @@ -62,9 +69,18 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
stream: S,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
) {
let addr = stream.peer_address_info();
if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await {
if let Err(e) = manage_incoming(
secret_key,
stream,
result_for_parent,
data_for_user,
authorization_requests_sender,
)
.await
{
info!(
target: LOG_TARGET,
"Incoming connection from {} failed: {}.", addr, e
Expand Down
4 changes: 4 additions & 0 deletions finality-aleph/src/network/clique/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
pub fn status_report(&self) -> impl Display {
ManagerStatus::new(self)
}

pub fn is_authorized(&self, public_key: &PK) -> bool {
self.wanted.interested(public_key)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions finality-aleph/src/network/clique/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,5 @@ pub struct MockPrelims<D> {
pub data_from_outgoing: Option<UnboundedReceiver<D>>,
pub result_from_incoming: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
pub result_from_outgoing: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
pub authorization_requests: mpsc::UnboundedReceiver<(MockPublicKey, oneshot::Sender<bool>)>,
}
33 changes: 29 additions & 4 deletions finality-aleph/src/network/clique/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};

use crate::network::clique::{
io::{ReceiveError, SendError},
Expand Down Expand Up @@ -57,6 +57,8 @@ pub enum ProtocolError<PK: PublicKey> {
NoParentConnection,
/// Data channel closed.
NoUserConnection,
/// Authorization error.
NotAuthorized,
}

impl<PK: PublicKey> Display for ProtocolError<PK> {
Expand All @@ -69,6 +71,7 @@ impl<PK: PublicKey> Display for ProtocolError<PK> {
CardiacArrest => write!(f, "heartbeat stopped"),
NoParentConnection => write!(f, "cannot send result to service"),
NoUserConnection => write!(f, "cannot send data to user"),
NotAuthorized => write!(f, "peer not authorized"),
}
}
}
Expand Down Expand Up @@ -103,13 +106,35 @@ impl Protocol {
&self,
stream: S,
secret_key: SK,
result_for_service: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(
SK::PublicKey,
oneshot::Sender<bool>,
)>,
) -> Result<(), ProtocolError<SK::PublicKey>> {
use Protocol::*;
match self {
V0 => v0::incoming(stream, secret_key, result_for_service, data_for_user).await,
V1 => v1::incoming(stream, secret_key, result_for_service, data_for_user).await,
V0 => {
v0::incoming(
stream,
secret_key,
authorization_requests_sender,
result_for_parent,
data_for_user,
)
.await
}
V1 => {
v1::incoming(
stream,
secret_key,
authorization_requests_sender,
result_for_parent,
data_for_user,
)
.await
}
}
}

Expand Down
119 changes: 114 additions & 5 deletions finality-aleph/src/network/clique/protocols/v0/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use futures::{channel::mpsc, StreamExt};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, info, trace};
use tokio::io::{AsyncRead, AsyncWrite};

Expand Down Expand Up @@ -88,6 +91,7 @@ async fn receiving<PK: PublicKey, D: Data, S: AsyncRead + Unpin + Send>(
pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
stream: S,
secret_key: SK,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), ProtocolError<SK::PublicKey>> {
Expand All @@ -98,6 +102,10 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
"Incoming handshake with {} finished successfully.", public_key
);

if !check_authorization::<SK>(authorization_requests_sender, public_key.clone()).await? {
return Err(ProtocolError::NotAuthorized);
}

let (tx_exit, mut exit) = mpsc::unbounded();
result_for_parent
.unbounded_send((
Expand All @@ -123,14 +131,32 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
}
}

pub async fn check_authorization<SK: SecretKey>(
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
public_key: SK::PublicKey,
) -> Result<bool, ProtocolError<SK::PublicKey>> {
let (sender, receiver) = oneshot::channel();
authorization_requests_sender
.unbounded_send((public_key.clone(), sender))
.map_err(|_| ProtocolError::NoParentConnection)?;
receiver
.await
.map_err(|_| ProtocolError::NoParentConnection)
}

#[cfg(test)]
mod tests {
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
pub mod tests {
use futures::{
channel::{mpsc, oneshot},
pin_mut, Future, FutureExt, StreamExt,
};

use super::{incoming, outgoing, ProtocolError};
use crate::network::clique::{
mock::{key, MockPrelims, MockSplittable},
protocols::ConnectionType,
protocols::{
v0::{incoming, outgoing},
ConnectionType, ProtocolError,
},
Data,
};

Expand All @@ -142,9 +168,11 @@ mod tests {
let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded();
let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded();
let (data_for_user, data_from_incoming) = mpsc::unbounded::<D>();
let (authorization_requests_sender, authorization_requests) = mpsc::unbounded();
let incoming_handle = Box::pin(incoming(
stream_incoming,
pen_incoming.clone(),
authorization_requests_sender,
incoming_result_for_service,
data_for_user,
));
Expand All @@ -165,9 +193,43 @@ mod tests {
data_from_outgoing: None,
result_from_incoming,
result_from_outgoing,
authorization_requests,
}
}

fn handle_authorization<PK: Send + 'static>(
mut authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender<bool>)>,
handler: impl FnOnce(PK) -> bool + Send + 'static,
) -> impl Future<Output = Result<(), ()>> {
tokio::spawn(async move {
let (public_key, response_sender) = authorization_requests
.next()
.await
.expect("We should recieve at least one authorization request.");
let authorization_result = handler(public_key);
response_sender
.send(authorization_result)
.expect("We should be able to send back an authorization response.");
Result::<(), ()>::Ok(())
})
.map(|result| match result {
Ok(ok) => ok,
Err(_) => Err(()),
})
}

fn all_pass_authorization_handler<PK: Send + 'static>(
authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender<bool>)>,
) -> impl Future<Output = Result<(), ()>> {
handle_authorization(authorization_requests, |_| true)
}

fn no_go_authorization_handler<PK: Send + 'static>(
authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender<bool>)>,
) -> impl Future<Output = Result<(), ()>> {
handle_authorization(authorization_requests, |_| false)
}

#[tokio::test]
async fn send_data() {
let MockPrelims {
Expand All @@ -176,8 +238,10 @@ mod tests {
mut data_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(incoming_handle);
Expand Down Expand Up @@ -223,8 +287,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(incoming_handle);
Expand Down Expand Up @@ -252,8 +318,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(result_from_incoming);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand All @@ -277,8 +345,10 @@ mod tests {
data_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(data_from_incoming);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand Down Expand Up @@ -315,8 +385,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(outgoing_handle);
match incoming_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -333,8 +405,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let incoming_handle = incoming_handle.fuse();
pin_mut!(incoming_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -359,8 +433,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
std::mem::drop(incoming_handle);
match outgoing_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -377,8 +453,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -405,8 +483,10 @@ mod tests {
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
result_from_outgoing: _result_from_outgoing,
authorization_requests,
..
} = prepare::<Vec<i32>>();
let _authorization_handle = all_pass_authorization_handler(authorization_requests);
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -421,4 +501,33 @@ mod tests {
Ok(_) => panic!("successfully finished when connection dead"),
};
}

#[tokio::test]
async fn do_not_call_sender_and_receiver_until_authorized() {
let MockPrelims {
incoming_handle,
outgoing_handle,
mut data_from_incoming,
mut result_from_incoming,
authorization_requests,
..
} = prepare::<Vec<i32>>();

let authorization_handle = no_go_authorization_handler(authorization_requests);

// since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly
let (incoming_result, outgoing_result, authorization_result) =
tokio::join!(incoming_handle, outgoing_handle, authorization_handle);

assert!(incoming_result.is_err());
assert!(outgoing_result.is_err());
// this also verifies if it was called at all
assert!(authorization_result.is_ok());

let data_from_incoming = data_from_incoming.try_next();
assert!(data_from_incoming.ok().flatten().is_none());

let result_from_incoming = result_from_incoming.try_next();
assert!(result_from_incoming.ok().flatten().is_none());
}
}
Loading