Skip to content

Commit

Permalink
[Network] Hacking multi-socket support.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 5, 2023
1 parent 7a1a302 commit e2abe87
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 91 deletions.
2 changes: 1 addition & 1 deletion crates/aptos-crypto/src/noise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ const _: [(); 32] = [(); HashValue::LENGTH];
//

/// A NoiseError enum represents the different types of error that noise can return to users of the crate
#[derive(Debug, Error)]
#[derive(Clone, Debug, Error)]
pub enum NoiseError {
/// the received message is too short to contain the expected data
#[error("noise: the received message is too short to contain the expected data")]
Expand Down
2 changes: 1 addition & 1 deletion network/framework/src/noise/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::io;
use thiserror::Error;

/// Different errors than can be raised when negotiating a Noise handshake.
#[derive(Debug, Error)]
#[derive(Clone, Debug, Error)]
pub enum NoiseHandshakeError {
#[error("noise client: MUST_FIX: missing remote server's public key when dialing")]
MissingServerPublicKey,
Expand Down
2 changes: 1 addition & 1 deletion network/framework/src/peer/fuzzing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub fn fuzz(data: &[u8]) {
ProtocolIdSet::all_known(),
PeerRole::Unknown,
);
let connection = Connection { socket, metadata };
let connection = Connection::new_with_single_socket(socket, metadata);

let (connection_notifs_tx, connection_notifs_rx) = aptos_channels::new_test(8);
let channel_size = 8;
Expand Down
13 changes: 7 additions & 6 deletions network/framework/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ pub struct Peer<TSocket> {
time_service: TimeService,
/// Connection specific information.
connection_metadata: ConnectionMetadata,
/// Underlying connection.
connection: Option<TSocket>,
/// Underlying connection (with multiple sockets) to the remote peer
connection: Option<Vec<TSocket>>,
/// Channel to notify PeerManager that we've disconnected.
connection_notifs_tx: aptos_channels::Sender<TransportNotification<TSocket>>,
/// Channel to receive requests from PeerManager to send messages and rpcs.
Expand Down Expand Up @@ -161,7 +161,7 @@ where
) -> Self {
let Connection {
metadata: connection_metadata,
socket,
sockets,
} = connection;
let remote_peer_id = connection_metadata.remote_peer_id;
let max_fragments = max_message_size / max_frame_size;
Expand All @@ -170,7 +170,7 @@ where
executor,
time_service: time_service.clone(),
connection_metadata,
connection: Some(socket),
connection: Some(sockets),
connection_notifs_tx,
peer_reqs_rx,
peer_notifs_tx,
Expand Down Expand Up @@ -209,8 +209,9 @@ where
);

// Split the connection into a ReadHalf and a WriteHalf.
let (read_socket, write_socket) =
tokio::io::split(self.connection.take().unwrap().compat());
let mut connection = self.connection.take().unwrap();
let socket = connection.remove(0);
let (read_socket, write_socket) = tokio::io::split(socket.compat());

let mut reader =
MultiplexMessageStream::new(read_socket.compat(), self.max_frame_size).fuse();
Expand Down
24 changes: 11 additions & 13 deletions network/framework/src/peer/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,16 @@ fn build_test_peer(
) {
let (a, b) = MemorySocket::new_pair();
let peer_id = PeerId::random();
let connection = Connection {
metadata: ConnectionMetadata::new(
peer_id,
ConnectionId::default(),
NetworkAddress::from_str("/ip4/127.0.0.1/tcp/8081").unwrap(),
origin,
MessagingProtocolVersion::V1,
ProtocolIdSet::empty(),
PeerRole::Unknown,
),
socket: a,
};
let connection_metadata = ConnectionMetadata::new(
peer_id,
ConnectionId::default(),
NetworkAddress::from_str("/ip4/127.0.0.1/tcp/8081").unwrap(),
origin,
MessagingProtocolVersion::V1,
ProtocolIdSet::empty(),
PeerRole::Unknown,
);
let connection = Connection::new_with_single_socket(a, connection_metadata);

let (connection_notifs_tx, connection_notifs_rx) = aptos_channels::new_test(1);
let (peer_reqs_tx, peer_reqs_rx) =
Expand Down Expand Up @@ -123,7 +121,7 @@ fn build_test_connected_peers(
build_test_peer(executor, time_service, ConnectionOrigin::Outbound);

// Make sure both peers are connected
peer_b.connection = Some(connection_a);
peer_b.connection = Some(vec![connection_a]);
(
(
peer_a,
Expand Down
19 changes: 10 additions & 9 deletions network/framework/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@ use crate::{
counters::{self},
logging::*,
peer::{Peer, PeerNotification, PeerRequest},
transport::{
Connection, ConnectionId, ConnectionMetadata, TSocket as TransportTSocket,
TRANSPORT_TIMEOUT,
},
transport::{Connection, ConnectionId, ConnectionMetadata, TSocket as TransportTSocket},
ProtocolId,
};
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::{NetworkContext, PeerNetworkId};
use aptos_logger::prelude::*;
use aptos_netcore::transport::{ConnectionOrigin, Transport};
use aptos_short_hex_str::AsShortHexStr;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_time_service::TimeService;
use aptos_types::{network_address::NetworkAddress, PeerId};
use futures::{
channel::oneshot,
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
io::{AsyncRead, AsyncWrite},
sink::SinkExt,
stream::StreamExt,
};
Expand Down Expand Up @@ -569,16 +566,19 @@ where
}
}

fn disconnect(&mut self, connection: Connection<TSocket>) {
fn disconnect(&mut self, _connection: Connection<TSocket>) {

// Close connection, and drop it.
// TODO: fix me!
/*
let network_context = self.network_context;
let time_service = self.time_service.clone();
// Close connection, and drop it
let drop_fut = async move {
let mut connection = connection;
let peer_id = connection.metadata.remote_peer_id;
if let Err(e) = time_service
.timeout(TRANSPORT_TIMEOUT, connection.socket.close())
.timeout(TRANSPORT_TIMEOUT, connection.close_sockets())
.await
{
warn!(
Expand All @@ -593,6 +593,7 @@ where
};
};
self.executor.spawn(drop_fut);
*/
}

fn add_peer(&mut self, connection: Connection<TSocket>) -> Result<(), Error> {
Expand Down
45 changes: 22 additions & 23 deletions network/framework/src/peer_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ pub fn build_test_transport(

memory_transport
.and_then(move |socket, addr, origin| async move {
Ok(Connection {
let connection_metadata = ConnectionMetadata::new(
PeerId::random(),
ConnectionId::default(),
addr,
origin,
MessagingProtocolVersion::V1,
ProtocolIdSet::mock(),
PeerRole::Unknown,
);
Ok(Connection::new_with_single_socket(
socket,
metadata: ConnectionMetadata::new(
PeerId::random(),
ConnectionId::default(),
addr,
origin,
MessagingProtocolVersion::V1,
ProtocolIdSet::mock(),
PeerRole::Unknown,
),
})
connection_metadata,
))
})
.boxed()
}
Expand Down Expand Up @@ -234,18 +235,16 @@ fn create_connection<TSocket: transport::TSocket>(
origin: ConnectionOrigin,
connection_id: ConnectionId,
) -> Connection<TSocket> {
Connection {
socket,
metadata: ConnectionMetadata::new(
peer_id,
connection_id,
addr,
origin,
MessagingProtocolVersion::V1,
ProtocolIdSet::mock(),
PeerRole::Unknown,
),
}
let connection_metadata = ConnectionMetadata::new(
peer_id,
connection_id,
addr,
origin,
MessagingProtocolVersion::V1,
ProtocolIdSet::mock(),
PeerRole::Unknown,
);
Connection::new_with_single_socket(socket, connection_metadata)
}

#[test]
Expand Down
78 changes: 53 additions & 25 deletions network/framework/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,40 @@ impl fmt::Display for ConnectionMetadata {
}
}

/// The `Connection` struct consists of connection metadata and the actual socket for
/// communication.
/// The `Connection` struct consists of connection metadata and the actual
/// sockets for communication.
#[derive(Debug)]
pub struct Connection<TSocket> {
pub socket: TSocket,
pub sockets: Vec<TSocket>,
pub metadata: ConnectionMetadata,
}

impl<TSocket> Connection<TSocket> {
/// Returns a new connection with a single socket
pub fn new_with_single_socket(
socket: TSocket,
metadata: ConnectionMetadata,
) -> Connection<TSocket> {
Connection {
sockets: vec![socket],
metadata,
}
}

/// Returns a new connection with multiple sockets
pub fn new_with_multiple_sockets(
sockets: Vec<TSocket>,
metadata: ConnectionMetadata,
) -> Connection<TSocket> {
Connection { sockets, metadata }
}

/// Returns a reference to the first socket in the connection
pub fn get_first_socket(&mut self) -> &mut TSocket {
&mut self.sockets[0]
}
}

/// Convenience function for adding a timeout to a Future that returns an `io::Result`.
async fn timeout_io<F, T>(time_service: TimeService, duration: Duration, fut: F) -> io::Result<T>
where
Expand Down Expand Up @@ -319,18 +345,19 @@ async fn upgrade_inbound<T: TSocket>(
})?;

// return successful connection
Ok(Connection {
let connection_metadata = ConnectionMetadata::new(
remote_peer_id,
CONNECTION_ID_GENERATOR.next(),
addr,
origin,
messaging_protocol,
application_protocols,
peer_role,
);
Ok(Connection::new_with_single_socket(
socket,
metadata: ConnectionMetadata::new(
remote_peer_id,
CONNECTION_ID_GENERATOR.next(),
addr,
origin,
messaging_protocol,
application_protocols,
peer_role,
),
})
connection_metadata,
))
}

/// Upgrade an outbound connection. This means we run a Noise IK handshake for
Expand Down Expand Up @@ -394,18 +421,19 @@ pub async fn upgrade_outbound<T: TSocket>(
})?;

// return successful connection
Ok(Connection {
let connection_metadata = ConnectionMetadata::new(
remote_peer_id,
CONNECTION_ID_GENERATOR.next(),
addr,
origin,
messaging_protocol,
application_protocols,
peer_role,
);
Ok(Connection::new_with_single_socket(
socket,
metadata: ConnectionMetadata::new(
remote_peer_id,
CONNECTION_ID_GENERATOR.next(),
addr,
origin,
messaging_protocol,
application_protocols,
peer_role,
),
})
connection_metadata,
))
}

/// The common AptosNet Transport.
Expand Down
Loading

0 comments on commit e2abe87

Please sign in to comment.