Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1413: remove legacy network #719

Merged
merged 9 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,6 @@ fn setup(
),
ServiceError,
> {
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Generic));
config
.network
.extra_sets
Expand Down Expand Up @@ -246,7 +242,7 @@ fn setup(

/// Builds a new service for a full client.
pub fn new_authority(
mut config: Configuration,
config: Configuration,
aleph_config: AlephCli,
) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
Expand All @@ -259,10 +255,6 @@ pub fn new_authority(
transaction_pool,
other: (block_import, justification_tx, justification_rx, mut telemetry, metrics),
} = new_partial(&config)?;
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Validator));

let backup_path = get_backup_path(
&aleph_config,
Expand Down
6 changes: 6 additions & 0 deletions docker/docker-compose.bridged.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ services:
networks:
- main
- Node0
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node0:30343

Node1:
extends:
Expand All @@ -15,6 +17,7 @@ services:
- main
- Node1
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node1:30344
maciejnems marked this conversation as resolved.
Show resolved Hide resolved
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

Node2:
Expand All @@ -25,6 +28,7 @@ services:
- main
- Node2
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node2:30345
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

Node3:
Expand All @@ -35,6 +39,7 @@ services:
- main
- Node3
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node3:30346
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

Node4:
Expand All @@ -45,6 +50,7 @@ services:
- main
- Node4
environment:
- PUBLIC_VALIDATOR_ADDRESS=Node4:30347
- BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID

networks:
Expand Down
8 changes: 0 additions & 8 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ pub fn peers_set_config(protocol: Protocol) -> sc_network::config::NonDefaultSet
);

config.set_config = match protocol {
// No spontaneous connections, only reserved nodes added by the network logic.
Protocol::Validator => sc_network::config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
},
Protocol::Generic => sc_network::config::SetConfig::default(),
Protocol::Authentication => sc_network::config::SetConfig::default(),
};
config
Expand Down
63 changes: 7 additions & 56 deletions finality-aleph/src/network/manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{
};

use codec::{Decode, Encode};
use log::{debug, info, trace, warn};
use log::{debug, info, trace};

use crate::{
network::{
manager::{Authentication, SessionHandler},
DataCommand, Multiaddress, Protocol,
DataCommand, Multiaddress,
},
NodeIndex, SessionId,
};
Expand All @@ -33,7 +33,7 @@ impl<M: Multiaddress> DiscoveryMessage<M> {
}
}

/// Handles creating and responding to discovery messages.
/// Handles creating and rebroadcasting discovery messages.
pub struct Discovery<M: Multiaddress> {
cooldown: Duration,
last_broadcast: HashMap<NodeIndex, Instant>,
Expand All @@ -54,16 +54,6 @@ fn authentication_broadcast<M: Multiaddress>(
)
}

fn response<M: Multiaddress>(
authentication: Authentication<M>,
peer_id: M::PeerId,
) -> DiscoveryCommand<M> {
(
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(peer_id, Protocol::Generic),
)
}

impl<M: Multiaddress> Discovery<M> {
/// Create a new discovery handler with the given response/broadcast cooldown.
pub fn new(cooldown: Duration) -> Self {
Expand Down Expand Up @@ -122,16 +112,6 @@ impl<M: Multiaddress> Discovery<M> {
}
let node_id = authentication.0.creator();
let mut messages = Vec::new();
match handler.peer_id(&node_id) {
Some(peer_id) => {
if let Some(handler_authentication) = handler.authentication() {
messages.push(response(handler_authentication, peer_id));
}
}
None => {
warn!(target: "aleph-network", "Id of correctly authenticated peer not present.")
}
}
if self.should_rebroadcast(&node_id) {
trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication);
self.last_broadcast.insert(node_id, Instant::now());
Expand Down Expand Up @@ -256,7 +236,7 @@ mod tests {
}

#[tokio::test]
async fn rebroadcasts_responds_and_accepts_addresses() {
async fn rebroadcasts_and_accepts_addresses() {
let (mut discovery, mut handlers, _) = build().await;
let authentication = handlers[1].authentication().unwrap();
let handler = &mut handlers[0];
Expand All @@ -265,19 +245,15 @@ mod tests {
handler,
);
assert_eq!(addresses, authentication.0.addresses());
assert_eq!(commands.len(), 2);
assert_eq!(commands.len(), 1);
assert!(commands.iter().any(|command| matches!(command, (
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
) if rebroadcast_authentication == &authentication)));
assert!(commands.iter().any(|command| matches!(command, (
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(_, _),
) if *authentication == handler.authentication().unwrap())));
}

#[tokio::test]
async fn non_validators_rebroadcasts_responds() {
async fn non_validators_rebroadcasts() {
let (mut discovery, handlers, mut non_validator) = build().await;
let authentication = handlers[1].authentication().unwrap();
let (addresses, commands) = discovery.handle_message(
Expand All @@ -293,7 +269,7 @@ mod tests {
}

#[tokio::test]
async fn does_not_rebroadcast_nor_respond_to_wrong_authentications() {
async fn does_not_rebroadcast_wrong_authentications() {
let (mut discovery, mut handlers, _) = build().await;
let (auth_data, _) = handlers[1].authentication().unwrap();
let (_, signature) = handlers[2].authentication().unwrap();
Expand All @@ -307,31 +283,6 @@ mod tests {
assert!(commands.is_empty());
}

#[tokio::test]
async fn does_not_rebroadcast_quickly_but_still_responds() {
let (mut discovery, mut handlers, _) = build().await;
let authentication = handlers[1].authentication().unwrap();
let handler = &mut handlers[0];
discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
let (addresses, commands) = discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
assert_eq!(addresses.len(), authentication.0.addresses().len());
assert_eq!(
addresses[0].encode(),
authentication.0.addresses()[0].encode()
);
assert_eq!(commands.len(), 1);
assert!(matches!(&commands[0], (
DiscoveryMessage::Authentication(authentication),
DataCommand::SendTo(_, _),
) if *authentication == handler.authentication().unwrap()));
}

#[tokio::test]
async fn rebroadcasts_after_cooldown() {
let (mut discovery, mut handlers, _) = build().await;
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub type Authentication<M> = (AuthData<M>, Signature);
/// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id.
/// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way,
/// this will allow us to retrofit versioning here if we ever need to change this structure.
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DataInSession<D: Data> {
pub data: D,
pub session_id: SessionId,
Expand Down
28 changes: 6 additions & 22 deletions finality-aleph/src/network/manager/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
Connections, Discovery, DiscoveryMessage, NetworkData, SessionHandler,
SessionHandlerError,
},
ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId, Protocol,
ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId,
},
MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL,
};
Expand Down Expand Up @@ -448,22 +448,12 @@ impl<NI: NetworkIdentity, D: Data> Service<NI, D> {
Recipient::Everyone => (0..handler.node_count().0)
.map(NodeIndex)
.flat_map(|node_id| handler.peer_id(&node_id))
.map(|peer_id| {
(
to_send.clone(),
DataCommand::SendTo(peer_id, Protocol::Validator),
)
})
.map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id)))
.collect(),
Recipient::Node(node_id) => handler
.peer_id(&node_id)
.into_iter()
.map(|peer_id| {
(
to_send.clone(),
DataCommand::SendTo(peer_id, Protocol::Validator),
)
})
.map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id)))
.collect(),
}
} else {
Expand Down Expand Up @@ -782,7 +772,7 @@ mod tests {
network::{
manager::{DiscoveryMessage, NetworkData},
mock::{crypto_basics, MockNetworkIdentity},
ConnectionCommand, DataCommand, Protocol,
ConnectionCommand, DataCommand,
},
Recipient, SessionId,
};
Expand Down Expand Up @@ -934,13 +924,10 @@ mod tests {
addresses.into_iter().collect()
))
);
assert_eq!(data.len(), 2);
assert_eq!(data.len(), 1);
assert!(data
.iter()
.any(|(_, command)| command == &DataCommand::Broadcast));
assert!(data
.iter()
.any(|(_, command)| matches!(command, &DataCommand::SendTo(_, _))));
}

#[tokio::test]
Expand Down Expand Up @@ -975,10 +962,7 @@ mod tests {
let messages = service.on_user_message(2137, session_id, Recipient::Everyone);
assert_eq!(messages.len(), 1);
let (network_data, data_command) = &messages[0];
assert!(matches!(
data_command,
DataCommand::SendTo(_, Protocol::Validator)
));
assert!(matches!(data_command, DataCommand::SendTo(_)));
assert_eq!(network_data, &NetworkData::Data(2137, session_id));
}
}
38 changes: 20 additions & 18 deletions finality-aleph/src/network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
collections::{HashSet, VecDeque},
fmt,
sync::Arc,
time::Duration,
};

use aleph_primitives::KEY_TYPE;
Expand All @@ -14,6 +15,7 @@ use futures::{
use parking_lot::Mutex;
use rand::random;
use sp_keystore::{testing::KeyStore, CryptoStore};
use tokio::time::timeout;

use crate::{
crypto::{AuthorityPen, AuthorityVerifier},
Expand Down Expand Up @@ -106,6 +108,8 @@ pub struct Channel<T>(
pub Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<T>>>,
);

const TIMEOUT_FAIL: Duration = Duration::from_secs(10);

impl<T> Channel<T> {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded();
Expand All @@ -117,7 +121,19 @@ impl<T> Channel<T> {
}

pub async fn next(&mut self) -> Option<T> {
self.1.lock().await.next().await
timeout(TIMEOUT_FAIL, self.1.lock().await.next())
.await
.ok()
.flatten()
}

pub async fn take(&mut self, n: usize) -> Vec<T> {
timeout(
TIMEOUT_FAIL,
self.1.lock().await.by_ref().take(n).collect::<Vec<_>>(),
)
.await
.unwrap_or(Vec::new())
}

pub async fn try_next(&self) -> Option<T> {
Expand All @@ -142,38 +158,24 @@ pub type MockData = Vec<u8>;
type MessageForUser<D, M> = (NetworkData<D, M>, DataCommand<<M as Multiaddress>::PeerId>);
type NetworkServiceIO<M> = NetworkIO<NetworkData<MockData, M>, M>;

pub struct MockIO<M: Multiaddress, LM: Multiaddress> {
pub struct MockIO<M: Multiaddress> {
pub messages_for_user: mpsc::UnboundedSender<MessageForUser<MockData, M>>,
pub messages_from_user: mpsc::UnboundedReceiver<NetworkData<MockData, M>>,
pub commands_for_manager: mpsc::UnboundedSender<ConnectionCommand<M>>,
pub legacy_messages_for_user: mpsc::UnboundedSender<MessageForUser<MockData, LM>>,
pub legacy_messages_from_user: mpsc::UnboundedReceiver<NetworkData<MockData, LM>>,
pub legacy_commands_for_manager: mpsc::UnboundedSender<ConnectionCommand<LM>>,
}

impl<M: Multiaddress + 'static, LM: Multiaddress + 'static> MockIO<M, LM> {
pub fn new() -> (MockIO<M, LM>, NetworkServiceIO<M>, NetworkServiceIO<LM>) {
impl<M: Multiaddress + 'static> MockIO<M> {
pub fn new() -> (MockIO<M>, NetworkServiceIO<M>) {
let (mock_messages_for_user, messages_from_user) = mpsc::unbounded();
let (messages_for_user, mock_messages_from_user) = mpsc::unbounded();
let (mock_commands_for_manager, commands_from_manager) = mpsc::unbounded();
let (legacy_mock_messages_for_user, legacy_messages_from_user) = mpsc::unbounded();
let (legacy_messages_for_user, legacy_mock_messages_from_user) = mpsc::unbounded();
let (legacy_mock_commands_for_manager, legacy_commands_from_manager) = mpsc::unbounded();
(
MockIO {
messages_for_user: mock_messages_for_user,
messages_from_user: mock_messages_from_user,
commands_for_manager: mock_commands_for_manager,
legacy_messages_for_user: legacy_mock_messages_for_user,
legacy_messages_from_user: legacy_mock_messages_from_user,
legacy_commands_for_manager: legacy_mock_commands_for_manager,
},
NetworkServiceIO::new(messages_from_user, messages_for_user, commands_from_manager),
NetworkServiceIO::new(
legacy_messages_from_user,
legacy_messages_for_user,
legacy_commands_from_manager,
),
)
}
}
Expand Down
Loading