Skip to content

Commit

Permalink
add infra to await specific logs in tests #69
Browse files Browse the repository at this point in the history
  • Loading branch information
marsella committed Feb 22, 2022
1 parent a7ef1c6 commit 2320d5c
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 82 deletions.
151 changes: 82 additions & 69 deletions integration_tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{
collections::HashMap,
fmt,
fs::{self, File},
io::{Read, Write},
io::Write,
process::Command,
sync::Mutex,
};
Expand All @@ -10,12 +11,15 @@ use {
futures::future::{self, Join},
rand::prelude::StdRng,
structopt::StructOpt,
thiserror::Error,
tokio::task::JoinHandle,
tracing::{error, info_span},
strum::IntoEnumIterator,
strum_macros::EnumIter,
tokio::{task::JoinHandle, time::Duration},
tracing::info_span,
tracing_futures::Instrument,
};

use crate::{await_log, TestLogs};

use zeekoe::{
customer::zkchannels::Command as _, merchant::zkchannels::Command as _, timeout::WithTimeout,
};
Expand All @@ -24,26 +28,51 @@ pub const CUSTOMER_CONFIG: &str = "integration_tests/gen/TestCustomer.toml";
pub const MERCHANT_CONFIG: &str = "integration_tests/gen/TestMerchant.toml";
pub const ERROR_FILENAME: &str = "integration_tests/gen/errors.log";

/// The default merchant services we will set up for tests (all run on localhost)
#[derive(Debug, Clone, Copy, EnumIter)]
enum MerchantServices {
IpV4,
IpV6,
}

impl MerchantServices {
fn to_str(self) -> &'static str {
match self {
Self::IpV4 => "127.0.0.1",
Self::IpV6 => "::1",
}
}
}

impl fmt::Display for MerchantServices {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// Note: this hard-codes the default port.
let ipaddr = match self {
Self::IpV4 => self.to_str().to_string(),
Self::IpV6 => format!("[{}]", self.to_str()),
};
write!(f, "{}:2611", ipaddr)
}
}

/// Give a name to the slightly annoying type of the joined server futures
type ServerFuture =
Join<JoinHandle<Result<(), anyhow::Error>>, JoinHandle<Result<(), anyhow::Error>>>;

/// Set of processes that run during a test.
#[derive(Debug, PartialEq)]
#[allow(unused)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Party {
MerchantServer,
CustomerWatcher,
/// The process corresponding to the `Operation` executed by the test harness
ActiveOperation(&'static str),
}

#[allow(unused)]
impl Party {
const fn to_str(&self) -> &str {
pub const fn to_str(self) -> &'static str {
match self {
Party::MerchantServer => "merchant server",
Party::CustomerWatcher => "customer watcher",
Party::MerchantServer => "party: merchant server",
Party::CustomerWatcher => "party: customer watcher",
Party::ActiveOperation(description) => description,
}
}
Expand Down Expand Up @@ -103,7 +132,7 @@ pub async fn setup(rng: &StdRng) -> ServerFuture {
let customer_config = customer_test_config().await;
let merchant_config = merchant_test_config().await;

// set up tracing for all of our own log messages
// set up tracing for all log messages
tracing_subscriber::fmt()
.with_writer(Mutex::new(
File::create(ERROR_FILENAME).expect("Failed to open log file"),
Expand All @@ -125,14 +154,46 @@ pub async fn setup(rng: &StdRng) -> ServerFuture {
.instrument(info_span!(Party::CustomerWatcher.to_str())),
);

// Check the logs of merchant + customer for indication of a successful set-up
// Note: hard-coded to match the 2-service merchant with default port.
let checks = vec![
await_log(
Party::MerchantServer,
TestLogs::MerchantServerSpawned(MerchantServices::IpV4.to_string()),
),
await_log(
Party::MerchantServer,
TestLogs::MerchantServerSpawned(MerchantServices::IpV6.to_string()),
),
await_log(Party::CustomerWatcher, TestLogs::CustomerWatcherSpawned),
];

// Wait up to 30sec for the servers to set up or fail
match future::join_all(checks)
.with_timeout(Duration::from_secs(30))
.await
{
Err(_) => panic!("Server setup timed out"),
Ok(results) => {
match results
.into_iter()
.collect::<Result<Vec<()>, anyhow::Error>>()
{
Ok(_) => {}
Err(err) => panic!(
"Failed to read logs while waiting for servers to set up: {:?}",
err
),
}
}
}

future::join(customer_handle, merchant_handle)
}

pub async fn teardown(server_future: ServerFuture) {
// Ignore the result because we expect it to be an `Expired` error
let _result = server_future
.with_timeout(tokio::time::Duration::new(1, 0))
.await;
let _result = server_future.with_timeout(Duration::from_secs(1)).await;

// delete data from this run
let _ = fs::remove_dir_all("integration_tests/gen/");
Expand Down Expand Up @@ -175,9 +236,9 @@ async fn merchant_test_config() -> zeekoe::merchant::Config {
});

// helper to write out the service for ipv4 and v6 localhost addresses
let generate_service = |addr: &str| {
let generate_service = |addr: MerchantServices| {
HashMap::from([
("address", addr),
("address", addr.to_str()),
("private_key", "localhost.key"),
("certificate", "localhost.crt"),
])
Expand All @@ -187,12 +248,11 @@ async fn merchant_test_config() -> zeekoe::merchant::Config {
})
};

let contents = format!(
"{}{}\n{}",
tezos_contents,
generate_service("::1"),
generate_service("127.0.0.1")
);
let services = MerchantServices::iter()
.map(generate_service)
.fold(String::new(), |acc, next| format!("{}\n{}", acc, next));

let contents = format!("{}{}", tezos_contents, services,);

write_config_file(MERCHANT_CONFIG, contents);

Expand All @@ -212,50 +272,3 @@ fn write_config_file(path: &str, contents: String) {
.write_all(contents.as_bytes())
.unwrap_or_else(|_| panic!("Failed to write to config file: {}", path));
}

#[derive(Debug, Error)]
#[allow(unused)]
pub enum LogError {
#[error("Failed to open log file: {0}")]
OpenFailed(std::io::Error),
#[error("Failed to read contents of file: {0}")]
ReadFailed(std::io::Error),
}

#[allow(unused)]
pub enum LogType {
Info,
Error,
Warn,
}

#[allow(unused)]
impl LogType {
pub fn to_str(&self) -> &str {
match self {
LogType::Info => "INFO",
LogType::Error => "ERROR",
LogType::Warn => "WARN",
}
}
}

/// Get any errors from the log file.
///
/// Current behavior: outputs the entire log
/// Ideal behavior: pass a Party, maybe an indicator of which test / channel name we want. Return
/// only the lines relevant to that setting.
#[allow(unused)]
pub fn get_logs(log_type: LogType, party: Party) -> Result<String, LogError> {
let mut file = File::open(ERROR_FILENAME).map_err(LogError::OpenFailed)?;
let mut logs = String::new();
file.read_to_string(&mut logs)
.map_err(LogError::ReadFailed)?;

Ok(logs
.lines()
.filter(|s| s.contains("zeekoe::"))
.filter(|s| s.contains(log_type.to_str()))
.filter(|s| s.contains(party.to_str()))
.fold("".to_string(), |acc, s| format!("{}{}\n", acc, s)))
}
76 changes: 69 additions & 7 deletions integration_tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ use zeekoe::{
customer::{self, database::StateName as CustomerStatus, zkchannels::Command},
merchant::{self, zkchannels::Command as _},
protocol::ChannelStatus as MerchantStatus,
TestLogs,
};

use {
anyhow::Context,
common::{customer_cli, merchant_cli, LogType, Party},
common::{customer_cli, merchant_cli, Party},
rand::prelude::StdRng,
std::{fs::OpenOptions, panic, time::Duration},
std::{
fs::{File, OpenOptions},
io::Read,
panic,
time::Duration,
},
structopt::StructOpt,
thiserror::Error,
};
Expand All @@ -27,9 +33,6 @@ pub async fn main() {
.await
.expect("Failed to load merchant config");

// Give the server some time to get set up
tokio::time::sleep(tokio::time::Duration::new(5, 0)).await;

// Run every test, printing out details if it fails
let tests = tests();
println!("Executing {} tests", tests.len());
Expand Down Expand Up @@ -161,8 +164,8 @@ impl Test {
// - logs are deleted after each test, so all errors correspond to this test
// - any Operation that throws an error is the final Operation in the test
// These mean that any error found in the logs is caused by the current operation
let customer_errors = common::get_logs(LogType::Error, Party::CustomerWatcher)?;
let merchant_errors = common::get_logs(LogType::Error, Party::MerchantServer)?;
let customer_errors = get_logs(LogType::Error, Party::CustomerWatcher)?;
let merchant_errors = get_logs(LogType::Error, Party::MerchantServer)?;

// Check whether the process errors matched the expectation.
match (
Expand Down Expand Up @@ -278,3 +281,62 @@ struct Outcome {
/// Which process, if any, had an error? Assumes that exactly one party will error.
error: Option<Party>,
}

#[derive(Debug, Error)]
#[allow(unused)]
pub enum LogError {
#[error("Failed to open log file: {0}")]
OpenFailed(std::io::Error),
#[error("Failed to read contents of file: {0}")]
ReadFailed(std::io::Error),
}

#[allow(unused)]
#[derive(Debug, Clone, Copy)]
pub enum LogType {
Info,
Error,
Warn,
}

#[allow(unused)]
impl LogType {
pub fn to_str(&self) -> &str {
match self {
LogType::Info => "INFO",
LogType::Error => "ERROR",
LogType::Warn => "WARN",
}
}
}

/// Get any errors from the log file.
///
/// Current behavior: outputs the entire log
/// Ideal behavior: pass a Party, maybe an indicator of which test / channel name we want. Return
/// only the lines relevant to that setting.
fn get_logs(log_type: LogType, party: Party) -> Result<String, LogError> {
let mut file = File::open(common::ERROR_FILENAME).map_err(LogError::OpenFailed)?;
let mut logs = String::new();
file.read_to_string(&mut logs)
.map_err(LogError::ReadFailed)?;

Ok(logs
.lines()
.filter(|s| s.contains("zeekoe::"))
.filter(|s| s.contains(log_type.to_str()))
.filter(|s| s.contains(party.to_str()))
.fold("".to_string(), |acc, s| format!("{}{}\n", acc, s)))
}

/// Wait for the log file to contain a specific entry.
///
/// This checks the log every 1 second; refactor if greater granularity is needed.
async fn await_log(party: Party, log: TestLogs) -> Result<(), anyhow::Error> {
loop {
if get_logs(LogType::Info, party)?.contains(&log.to_string()) {
return Ok(());
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
22 changes: 22 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,25 @@ mod database;
mod defaults;
mod transport;
mod zkchannels;

use std::fmt;

/// Logs used to verify that an operation completed in the integration tests.
pub enum TestLogs {
CustomerWatcherSpawned,
/// Merchant server successfully serving at address described by parameter.
MerchantServerSpawned(String),
}

impl fmt::Display for TestLogs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
TestLogs::CustomerWatcherSpawned => "customer watcher created successfully".into(),
TestLogs::MerchantServerSpawned(addr) => format!("serving on: {:?}", addr),
}
)
}
}
5 changes: 3 additions & 2 deletions src/transport/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The server side of Zeekoe's transport layer.
use tracing::{error, info};
use {
crate::TestLogs,
dialectic::prelude::*,
dialectic_reconnect::resume,
dialectic_tokio_serde::codec::LengthDelimitedCodec,
Expand All @@ -13,6 +13,7 @@ use {
thiserror::Error,
tokio::{net::TcpListener, select, sync::mpsc},
tokio_rustls::{rustls, TlsAcceptor},
tracing::{error, info},
};

use super::{channel::TransportError, handshake, io_stream::IoStream, pem};
Expand Down Expand Up @@ -194,7 +195,7 @@ where

// Bind to the address and serve
let address = address.into();
info!("serving on: {:?}", address);
info!("{}", TestLogs::MerchantServerSpawned(address.to_string()));
let listener = TcpListener::bind(address).await?;

// Loop over incoming TCP connections until `initialize` returns `None`
Expand Down
Loading

0 comments on commit 2320d5c

Please sign in to comment.