Skip to content

Commit

Permalink
fix(test): refactor structure of test code and fix echo_service test
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel-faber committed Feb 12, 2021
1 parent 37abcf9 commit 591ebf8
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 131 deletions.
48 changes: 47 additions & 1 deletion src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ pub(super) fn listen_for_incoming_messages(
let src = *remover.remote_addr();
let _ = tokio::spawn(async move {
loop {
log::info!("Listening for new messages on new thread");
let message: Option<Message> = select! {
next_uni = next_on_uni_streams(&mut uni_streams) =>
next_uni.map(|(bytes, recv)| Message::UniStream {
Expand Down Expand Up @@ -360,3 +359,50 @@ async fn next_on_bi_streams(
},
}
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;

use crate::{Error, config::Config, tests::get_incoming_connection, wire_msg::WireMsg};
use crate::api::QuicP2p;
use std::net::{IpAddr, Ipv4Addr};

#[tokio::test]
async fn echo_service() -> Result<(), Error> {
let qp2p = QuicP2p::with_config(
Some(Config {
local_port: None,
local_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
..Config::default()
}),
Default::default(),
false,
)?;

// Create Endpoint
let mut peer1 = qp2p.new_endpoint().await?;
let peer1_addr = peer1.socket_addr().await?;

let mut peer2 = qp2p.new_endpoint().await?;
let peer2_addr = peer2.socket_addr().await?;

peer2.connect_to(&peer1_addr).await?;

if let Some(connecting_peer) = get_incoming_connection(&mut peer1).await {
assert_eq!(connecting_peer, peer2_addr);
}

let connection = peer1.get_connection(&peer2_addr).ok_or_else(|| Error::MissingConnection)?;
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
let message = WireMsg::EndpointEchoReq;
message.write_to_stream(&mut send_stream.quinn_send_stream).await?;
let message = WireMsg::read_from_stream(&mut recv_stream.quinn_recv_stream).await?;
if let WireMsg::EndpointEchoResp(addr) = message {
assert_eq!(addr, peer1_addr);
} else {
anyhow!("Unexpected response to EchoService request");
}
Ok(())
}
}
73 changes: 1 addition & 72 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{api::Message, utils, Config, Endpoint, QuicP2p};
use super::{get_disconnection_event, get_incoming_connection, get_incoming_message, new_qp2p, random_msg};
use anyhow::{anyhow, Result};
use assert_matches::assert_matches;
use bytes::Bytes;
Expand All @@ -11,78 +12,6 @@ use std::{
time::Duration,
};

/// Constructs a `QuicP2p` node with some sane defaults for testing.
pub fn new_qp2p() -> Result<QuicP2p> {
new_qp2p_with_hcc(HashSet::default())
}

fn new_qp2p_with_hcc(hard_coded_contacts: HashSet<SocketAddr>) -> Result<QuicP2p> {
let qp2p = QuicP2p::with_config(
Some(Config {
local_port: Some(0),
local_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts,
..Config::default()
}),
// Make sure we start with an empty cache. Otherwise, we might get into unexpected state.
Default::default(),
true,
)?;

Ok(qp2p)
}

fn random_msg() -> Bytes {
let random_bytes: Vec<u8> = (0..1024).map(|_| rand::random::<u8>()).collect();
Bytes::from(random_bytes)
}

// Helper function that waits for an incoming connection.
// After 3 attempts, if no incoming connection is reported it returns None.
async fn get_incoming_connection(listening_peer: &mut Endpoint) -> Option<SocketAddr> {
let mut attempts = 0;
loop {
if let Some(connecting_peer) = listening_peer.next_incoming_connection().await {
return Some(connecting_peer);
}
thread::sleep(Duration::from_secs(2));
attempts += 1;
if attempts > 2 {
return None;
}
}
}

async fn get_disconnection_event(listening_peer: &mut Endpoint) -> Option<SocketAddr> {
let mut attempts = 0;
loop {
if let Some(connecting_peer) = listening_peer.next_disconnected_peer().await {
return Some(connecting_peer);
}
thread::sleep(Duration::from_secs(2));
attempts += 1;
if attempts > 2 {
return None;
}
}
}

// Helper function that listens for incoming messages
// After 3 attemps if no message has arrived it returns None.
async fn get_incoming_message(listening_peer: &mut Endpoint) -> Option<(SocketAddr, Bytes)> {
let mut attempts = 0;
loop {
if let Some((source, message)) = listening_peer.next_incoming_message().await {
return Some((source, message));
}
thread::sleep(Duration::from_secs(2));
attempts += 1;
if attempts > 2 {
return None;
}
}
}

#[tokio::test(core_threads = 10)]
async fn successful_connection() -> Result<()> {
utils::init_logging();
Expand Down
57 changes: 0 additions & 57 deletions src/tests/echo_service.rs

This file was deleted.

87 changes: 86 additions & 1 deletion src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,87 @@
use crate::{api::Message, utils, Config, Endpoint, QuicP2p};
use anyhow::{anyhow, Result};
use assert_matches::assert_matches;
use bytes::Bytes;
use futures::future;
use quinn::EndpointError;
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
thread,
time::Duration,
};

mod common;
mod echo_service;


/// Constructs a `QuicP2p` node with some sane defaults for testing.
pub fn new_qp2p() -> Result<QuicP2p> {
new_qp2p_with_hcc(HashSet::default())
}

pub fn new_qp2p_with_hcc(hard_coded_contacts: HashSet<SocketAddr>) -> Result<QuicP2p> {
let qp2p = QuicP2p::with_config(
Some(Config {
local_port: Some(0),
local_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts,
..Config::default()
}),
// Make sure we start with an empty cache. Otherwise, we might get into unexpected state.
Default::default(),
true,
)?;

Ok(qp2p)
}

pub fn random_msg() -> Bytes {
let random_bytes: Vec<u8> = (0..1024).map(|_| rand::random::<u8>()).collect();
Bytes::from(random_bytes)
}

// Helper function that waits for an incoming connection.
// After 3 attempts, if no incoming connection is reported it returns None.
pub async fn get_incoming_connection(listening_peer: &mut Endpoint) -> Option<SocketAddr> {
let mut attempts = 0;
loop {
if let Some(connecting_peer) = listening_peer.next_incoming_connection().await {
return Some(connecting_peer);
}
thread::sleep(Duration::from_secs(2));
attempts += 1;
if attempts > 2 {
return None;
}
}
}

pub async fn get_disconnection_event(listening_peer: &mut Endpoint) -> Option<SocketAddr> {
let mut attempts = 0;
loop {
if let Some(connecting_peer) = listening_peer.next_disconnected_peer().await {
return Some(connecting_peer);
}
thread::sleep(Duration::from_secs(2));
attempts += 1;
if attempts > 2 {
return None;
}
}
}

// Helper function that listens for incoming messages
// After 3 attemps if no message has arrived it returns None.
pub async fn get_incoming_message(listening_peer: &mut Endpoint) -> Option<(SocketAddr, Bytes)> {
let mut attempts = 0;
loop {
if let Some((source, message)) = listening_peer.next_incoming_message().await {
return Some((source, message));
}
thread::sleep(Duration::from_secs(2));
attempts += 1;
if attempts > 2 {
return None;
}
}
}

0 comments on commit 591ebf8

Please sign in to comment.