Skip to content

Commit

Permalink
Don't do connection management in perf behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Mar 3, 2023
1 parent ffdf089 commit 39694af
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions protocols/perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
anyhow = "1"
clap = { version = "4.1.6", features = ["derive"] }
either = "1.8.0"
env_logger = "0.10.0"
Expand All @@ -23,6 +24,7 @@ libp2p-swarm = { version = "0.42.0", path = "../../swarm", features = ["macros",
libp2p-tcp = { version = "0.39.0", path = "../../transports/tcp", features = ["async-io"] }
libp2p-yamux = { version = "0.43.0", path = "../../muxers/yamux" }
log = "0.4"
thiserror = "1.0"
void = "1"

[dev-dependencies]
Expand Down
22 changes: 19 additions & 3 deletions protocols/perf/src/bin/perf-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use anyhow::{bail, Result};
use clap::Parser;
use futures::{executor::block_on, future::Either, StreamExt};
use libp2p_core::{
Expand All @@ -35,7 +36,7 @@ struct Opts {
server_address: Multiaddr,
}

fn main() {
fn main() -> Result<()> {
env_logger::init();

let opts = Opts::parse();
Expand Down Expand Up @@ -77,13 +78,26 @@ fn main() {
.substream_upgrade_protocol_override(upgrade::Version::V1Lazy)
.build();

swarm.dial(opts.server_address).unwrap();
let server_peer_id = block_on(async {
loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => return Ok(peer_id),
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
e => panic!("{e:?}"),
}
}
})?;

swarm.behaviour_mut().perf(
opts.server_address.clone(),
server_peer_id,
RunParams {
to_send: 10 * 1024 * 1024,
to_receive: 10 * 1024 * 1024,
},
);
)?;

let stats = block_on(async {
loop {
Expand Down Expand Up @@ -121,4 +135,6 @@ fn main() {
receive_time,
receive_bandwidth_mebibit_second
);

Ok(())
}
57 changes: 36 additions & 21 deletions protocols/perf/src/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
//! [`NetworkBehaviour`] of the libp2p perf protocol.

use std::{
collections::{HashMap, VecDeque},
collections::{HashSet, VecDeque},
task::{Context, Poll},
};

use either::Either;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{
derive_prelude::ConnectionEstablished, dial_opts::DialOpts, dummy, ConnectionId, FromSwarm,
derive_prelude::ConnectionEstablished, dummy, ConnectionClosed, ConnectionId, FromSwarm,
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandlerInEvent,
THandlerOutEvent,
};
Expand All @@ -42,28 +42,39 @@ pub enum Event {

#[derive(Default)]
pub struct Behaviour {
pending_run: HashMap<ConnectionId, RunParams>,
/// Queue of actions to return when polled.
queued_events: VecDeque<NetworkBehaviourAction<Event, THandlerInEvent<Self>>>,
/// Set of connected peers.
connected: HashSet<PeerId>,
}

impl Behaviour {
pub fn new() -> Self {
Self::default()
}

pub fn perf(&mut self, server: Multiaddr, params: RunParams) {
let opts: DialOpts = server.into();
let connection_id = opts.connection_id();

self.pending_run.insert(connection_id, params);
pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result<(), PerfError> {
if !self.connected.contains(&server) {
return Err(PerfError::NotConnected);
}

// TODO: What if we are already connected?
self.queued_events
.push_back(NetworkBehaviourAction::Dial { opts });
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: server,
handler: NotifyHandler::Any,
event: Either::Left(crate::client::handler::Command::Start { params }),
});

return Ok(());
}
}

#[derive(thiserror::Error, Debug)]
pub enum PerfError {
#[error("Not connected to peer")]
NotConnected,
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Either<Handler, dummy::ConnectionHandler>;
type OutEvent = Event;
Expand Down Expand Up @@ -92,20 +103,24 @@ impl NetworkBehaviour for Behaviour {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
connection_id: _,
endpoint: _,
failed_addresses: _,
other_established: _,
}) => self
.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: Either::Left(crate::client::handler::Command::Start {
params: self.pending_run.remove(&connection_id).unwrap(),
}),
}),
FromSwarm::ConnectionClosed(_) => todo!(),
}) => {
self.connected.insert(peer_id);
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id: _,
endpoint: _,
handler: _,
remaining_established,
}) => {
if remaining_established == 0 {
assert!(self.connected.remove(&peer_id));
}
}
FromSwarm::AddressChange(_) => todo!(),
FromSwarm::DialFailure(_) => todo!(),
FromSwarm::ListenFailure(_) => todo!(),
Expand Down

0 comments on commit 39694af

Please sign in to comment.