Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1413: remove legacy network #719

Merged
merged 9 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,6 @@ fn setup(
),
ServiceError,
> {
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Generic));
config
.network
.extra_sets
Expand Down Expand Up @@ -246,7 +242,7 @@ fn setup(

/// Builds a new service for a full client.
pub fn new_authority(
mut config: Configuration,
config: Configuration,
aleph_config: AlephCli,
) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
Expand All @@ -259,10 +255,6 @@ pub fn new_authority(
transaction_pool,
other: (block_import, justification_tx, justification_rx, mut telemetry, metrics),
} = new_partial(&config)?;
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Validator));

let backup_path = get_backup_path(
&aleph_config,
Expand Down
8 changes: 0 additions & 8 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ pub fn peers_set_config(protocol: Protocol) -> sc_network::config::NonDefaultSet
);

config.set_config = match protocol {
// No spontaneous connections, only reserved nodes added by the network logic.
Protocol::Validator => sc_network::config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
},
Protocol::Generic => sc_network::config::SetConfig::default(),
Protocol::Authentication => sc_network::config::SetConfig::default(),
};
config
Expand Down
58 changes: 6 additions & 52 deletions finality-aleph/src/network/manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{
};

use codec::{Decode, Encode};
use log::{debug, info, trace, warn};
use log::{debug, info, trace};

use crate::{
network::{
manager::{Authentication, SessionHandler},
DataCommand, Multiaddress, Protocol,
DataCommand, Multiaddress,
},
NodeIndex, SessionId,
};
Expand Down Expand Up @@ -54,16 +54,6 @@ fn authentication_broadcast<M: Multiaddress>(
)
}

fn response<M: Multiaddress>(
authentication: Authentication<M>,
peer_id: M::PeerId,
) -> DiscoveryCommand<M> {
(
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(peer_id, Protocol::Generic),
)
}

impl<M: Multiaddress> Discovery<M> {
/// Create a new discovery handler with the given response/broadcast cooldown.
pub fn new(cooldown: Duration) -> Self {
Expand Down Expand Up @@ -110,6 +100,9 @@ impl<M: Multiaddress> Discovery<M> {
}
}

// Responding to authentication broadcasts will be brought back here in A0-1471.
// Because of that we leave `Vec<DiscoveryCommand<M>>` even though right now it contains
// at max 1 message.
fn handle_broadcast(
&mut self,
authentication: Authentication<M>,
Expand All @@ -122,16 +115,6 @@ impl<M: Multiaddress> Discovery<M> {
}
let node_id = authentication.0.creator();
let mut messages = Vec::new();
match handler.peer_id(&node_id) {
Some(peer_id) => {
if let Some(handler_authentication) = handler.authentication() {
messages.push(response(handler_authentication, peer_id));
}
}
None => {
warn!(target: "aleph-network", "Id of correctly authenticated peer not present.")
}
}
if self.should_rebroadcast(&node_id) {
trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication);
self.last_broadcast.insert(node_id, Instant::now());
Expand Down Expand Up @@ -265,15 +248,11 @@ mod tests {
handler,
);
assert_eq!(addresses, authentication.0.addresses());
assert_eq!(commands.len(), 2);
assert_eq!(commands.len(), 1);
assert!(commands.iter().any(|command| matches!(command, (
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
) if rebroadcast_authentication == &authentication)));
assert!(commands.iter().any(|command| matches!(command, (
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(_, _),
) if *authentication == handler.authentication().unwrap())));
}

#[tokio::test]
Expand Down Expand Up @@ -307,31 +286,6 @@ mod tests {
assert!(commands.is_empty());
}

#[tokio::test]
async fn does_not_rebroadcast_quickly_but_still_responds() {
let (mut discovery, mut handlers, _) = build().await;
let authentication = handlers[1].authentication().unwrap();
let handler = &mut handlers[0];
discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
let (addresses, commands) = discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
assert_eq!(addresses.len(), authentication.0.addresses().len());
assert_eq!(
addresses[0].encode(),
authentication.0.addresses()[0].encode()
);
assert_eq!(commands.len(), 1);
assert!(matches!(&commands[0], (
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(_, _),
) if *authentication == handler.authentication().unwrap()));
}

#[tokio::test]
async fn rebroadcasts_after_cooldown() {
let (mut discovery, mut handlers, _) = build().await;
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub type Authentication<M> = (AuthData<M>, Signature);
/// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id.
/// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way,
/// this will allow us to retrofit versioning here if we ever need to change this structure.
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DataInSession<D: Data> {
pub data: D,
pub session_id: SessionId,
Expand Down
28 changes: 6 additions & 22 deletions finality-aleph/src/network/manager/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
Connections, Discovery, DiscoveryMessage, NetworkData, SessionHandler,
SessionHandlerError,
},
ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId, Protocol,
ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId,
},
MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL,
};
Expand Down Expand Up @@ -448,22 +448,12 @@ impl<NI: NetworkIdentity, D: Data> Service<NI, D> {
Recipient::Everyone => (0..handler.node_count().0)
.map(NodeIndex)
.flat_map(|node_id| handler.peer_id(&node_id))
.map(|peer_id| {
(
to_send.clone(),
DataCommand::SendTo(peer_id, Protocol::Validator),
)
})
.map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id)))
.collect(),
Recipient::Node(node_id) => handler
.peer_id(&node_id)
.into_iter()
.map(|peer_id| {
(
to_send.clone(),
DataCommand::SendTo(peer_id, Protocol::Validator),
)
})
.map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id)))
.collect(),
}
} else {
Expand Down Expand Up @@ -782,7 +772,7 @@ mod tests {
network::{
manager::{DiscoveryMessage, NetworkData},
mock::{crypto_basics, MockNetworkIdentity},
ConnectionCommand, DataCommand, Protocol,
ConnectionCommand, DataCommand,
},
Recipient, SessionId,
};
Expand Down Expand Up @@ -934,13 +924,10 @@ mod tests {
addresses.into_iter().collect()
))
);
assert_eq!(data.len(), 2);
assert_eq!(data.len(), 1);
assert!(data
.iter()
.any(|(_, command)| command == &DataCommand::Broadcast));
assert!(data
.iter()
.any(|(_, command)| matches!(command, &DataCommand::SendTo(_, _))));
}

#[tokio::test]
Expand Down Expand Up @@ -975,10 +962,7 @@ mod tests {
let messages = service.on_user_message(2137, session_id, Recipient::Everyone);
assert_eq!(messages.len(), 1);
let (network_data, data_command) = &messages[0];
assert!(matches!(
data_command,
DataCommand::SendTo(_, Protocol::Validator)
));
assert!(matches!(data_command, DataCommand::SendTo(_)));
assert_eq!(network_data, &NetworkData::Data(2137, session_id));
}
}
38 changes: 20 additions & 18 deletions finality-aleph/src/network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
collections::{HashSet, VecDeque},
fmt,
sync::Arc,
time::Duration,
};

use aleph_primitives::KEY_TYPE;
Expand All @@ -14,6 +15,7 @@ use futures::{
use parking_lot::Mutex;
use rand::random;
use sp_keystore::{testing::KeyStore, CryptoStore};
use tokio::time::timeout;

use crate::{
crypto::{AuthorityPen, AuthorityVerifier},
Expand Down Expand Up @@ -106,6 +108,8 @@ pub struct Channel<T>(
pub Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<T>>>,
);

const TIMEOUT_FAIL: Duration = Duration::from_secs(10);

impl<T> Channel<T> {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded();
Expand All @@ -117,7 +121,19 @@ impl<T> Channel<T> {
}

pub async fn next(&mut self) -> Option<T> {
self.1.lock().await.next().await
timeout(TIMEOUT_FAIL, self.1.lock().await.next())
.await
.ok()
.flatten()
}

pub async fn take(&mut self, n: usize) -> Vec<T> {
timeout(
TIMEOUT_FAIL,
self.1.lock().await.by_ref().take(n).collect::<Vec<_>>(),
)
.await
.unwrap_or(Vec::new())
}

pub async fn try_next(&self) -> Option<T> {
Expand All @@ -142,38 +158,24 @@ pub type MockData = Vec<u8>;
type MessageForUser<D, M> = (NetworkData<D, M>, DataCommand<<M as Multiaddress>::PeerId>);
type NetworkServiceIO<M> = NetworkIO<NetworkData<MockData, M>, M>;

pub struct MockIO<M: Multiaddress, LM: Multiaddress> {
pub struct MockIO<M: Multiaddress> {
pub messages_for_user: mpsc::UnboundedSender<MessageForUser<MockData, M>>,
pub messages_from_user: mpsc::UnboundedReceiver<NetworkData<MockData, M>>,
pub commands_for_manager: mpsc::UnboundedSender<ConnectionCommand<M>>,
pub legacy_messages_for_user: mpsc::UnboundedSender<MessageForUser<MockData, LM>>,
pub legacy_messages_from_user: mpsc::UnboundedReceiver<NetworkData<MockData, LM>>,
pub legacy_commands_for_manager: mpsc::UnboundedSender<ConnectionCommand<LM>>,
}

impl<M: Multiaddress + 'static, LM: Multiaddress + 'static> MockIO<M, LM> {
pub fn new() -> (MockIO<M, LM>, NetworkServiceIO<M>, NetworkServiceIO<LM>) {
impl<M: Multiaddress + 'static> MockIO<M> {
pub fn new() -> (MockIO<M>, NetworkServiceIO<M>) {
let (mock_messages_for_user, messages_from_user) = mpsc::unbounded();
let (messages_for_user, mock_messages_from_user) = mpsc::unbounded();
let (mock_commands_for_manager, commands_from_manager) = mpsc::unbounded();
let (legacy_mock_messages_for_user, legacy_messages_from_user) = mpsc::unbounded();
let (legacy_messages_for_user, legacy_mock_messages_from_user) = mpsc::unbounded();
let (legacy_mock_commands_for_manager, legacy_commands_from_manager) = mpsc::unbounded();
(
MockIO {
messages_for_user: mock_messages_for_user,
messages_from_user: mock_messages_from_user,
commands_for_manager: mock_commands_for_manager,
legacy_messages_for_user: legacy_mock_messages_for_user,
legacy_messages_from_user: legacy_mock_messages_from_user,
legacy_commands_for_manager: legacy_mock_commands_for_manager,
},
NetworkServiceIO::new(messages_from_user, messages_for_user, commands_from_manager),
NetworkServiceIO::new(
legacy_messages_from_user,
legacy_messages_for_user,
legacy_commands_from_manager,
),
)
}
}
Expand Down
8 changes: 2 additions & 6 deletions finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,9 @@ pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync {
fn add_matching_peer_id(self, peer_id: Self::PeerId) -> Option<Self>;
}

/// The Generic protocol is used for validator discovery.
/// The Validator protocol is used for validator-specific messages, i.e. ones needed for
/// finalization.
/// The Authentication protocol is used for validator discovery.
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum Protocol {
Generic,
Validator,
Authentication,
}

Expand Down Expand Up @@ -165,7 +161,7 @@ pub trait RequestBlocks<B: Block>: Clone + Send + Sync + 'static {
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum DataCommand<PID: PeerId> {
Broadcast,
SendTo(PID, Protocol),
SendTo(PID),
}

/// Commands for manipulating the reserved peers set.
Expand Down
Loading