Skip to content

Commit

Permalink
[ping] Add missing flush after write. (#1770)
Browse files Browse the repository at this point in the history
After sending the ping and before awaiting the pong,
the stream must be flushed to guarantee the data is
sent before awaiting the reply.
  • Loading branch information
romanb authored Sep 28, 2020
1 parent 0b18b86 commit f66f40b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 12 deletions.
7 changes: 7 additions & 0 deletions protocols/ping/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# 0.22.1 [unreleased]

- Ensure the outbound ping is flushed before awaiting
the response. Otherwise the behaviour depends on
implementation details of the stream muxer used.
The current behaviour resulted in stalls with Mplex.

# 0.22.0 [2020-09-09]

- Update `libp2p-swarm` and `libp2p-core`.
Expand Down
3 changes: 2 additions & 1 deletion protocols/ping/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-ping"
edition = "2018"
description = "Ping protocol for libp2p"
version = "0.22.0"
version = "0.22.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -23,4 +23,5 @@ async-std = "1.6.2"
libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] }
libp2p-noise = { path = "../../protocols/noise" }
libp2p-yamux = { path = "../../muxers/yamux" }
libp2p-mplex = { path = "../../muxers/mplex" }
quickcheck = "0.9.0"
4 changes: 4 additions & 0 deletions protocols/ping/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ where
let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
log::debug!("Preparing ping payload {:?}", payload);
stream.write_all(&payload).await?;
stream.flush().await?;
let started = Instant::now();
let mut recv_payload = [0u8; PING_SIZE];
log::debug!("Awaiting pong for {:?}", payload);
stream.read_exact(&mut recv_payload).await?;
if recv_payload == payload {
Ok((stream, started.elapsed()))
Expand All @@ -103,7 +105,9 @@ where
S: AsyncRead + AsyncWrite + Unpin
{
let mut payload = [0u8; PING_SIZE];
log::debug!("Waiting for ping ...");
stream.read_exact(&mut payload).await?;
log::debug!("Sending pong for {:?}", payload);
stream.write_all(&payload).await?;
stream.flush().await?;
Ok(stream)
Expand Down
43 changes: 32 additions & 11 deletions protocols/ping/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,28 @@ use libp2p_core::{
transport::{Transport, boxed::Boxed},
upgrade
};
use libp2p_mplex as mplex;
use libp2p_noise as noise;
use libp2p_ping::*;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_tcp::TcpConfig;
use libp2p_yamux as yamux;
use futures::{prelude::*, channel::mpsc};
use quickcheck::*;
use rand::prelude::*;
use std::{io, num::NonZeroU8, time::Duration};

#[test]
fn ping_pong() {
fn prop(count: NonZeroU8) {
fn prop(count: NonZeroU8, muxer: MuxerChoice) {
let cfg = PingConfig::new()
.with_keep_alive(true)
.with_interval(Duration::from_millis(10));

let (peer1_id, trans) = mk_transport();
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 = Swarm::new(trans, Ping::new(cfg.clone()), peer1_id.clone());

let (peer2_id, trans) = mk_transport();
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone());

let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
Expand Down Expand Up @@ -103,25 +106,24 @@ fn ping_pong() {
assert!(rtt < Duration::from_millis(50));
}

QuickCheck::new().tests(3).quickcheck(prop as fn(_))
QuickCheck::new().tests(10).quickcheck(prop as fn(_,_))
}


/// Tests that the connection is closed upon a configurable
/// number of consecutive ping failures.
#[test]
fn max_failures() {
fn prop(max_failures: NonZeroU8) {
fn prop(max_failures: NonZeroU8, muxer: MuxerChoice) {
let cfg = PingConfig::new()
.with_keep_alive(true)
.with_interval(Duration::from_millis(10))
.with_timeout(Duration::from_millis(0))
.with_max_failures(max_failures.into());

let (peer1_id, trans) = mk_transport();
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 = Swarm::new(trans, Ping::new(cfg.clone()), peer1_id.clone());

let (peer2_id, trans) = mk_transport();
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone());

let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
Expand Down Expand Up @@ -188,11 +190,11 @@ fn max_failures() {
assert_eq!(u8::max(count1, count2), max_failures.get() - 1);
}

QuickCheck::new().tests(3).quickcheck(prop as fn(_))
QuickCheck::new().tests(10).quickcheck(prop as fn(_,_))
}


fn mk_transport() -> (
fn mk_transport(muxer: MuxerChoice) -> (
PeerId,
Boxed<
(PeerId, StreamMuxerBox),
Expand All @@ -202,13 +204,32 @@ fn mk_transport() -> (
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().into_peer_id();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();

let transport = TcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_yamux::Config::default())
.multiplex(match muxer {
MuxerChoice::Yamux =>
upgrade::EitherUpgrade::A(yamux::Config::default()),
MuxerChoice::Mplex =>
upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
})
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();

(peer_id, transport)
}

#[derive(Debug, Copy, Clone)]
enum MuxerChoice {
Mplex,
Yamux,
}

impl Arbitrary for MuxerChoice {
fn arbitrary<G: Gen>(g: &mut G) -> MuxerChoice {
*[MuxerChoice::Mplex, MuxerChoice::Yamux].choose(g).unwrap()
}
}

0 comments on commit f66f40b

Please sign in to comment.