-
-
Notifications
You must be signed in to change notification settings - Fork 144
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test(subscriber): add initial integration tests (#452)
The `console-subscriber` crate has no integration tests. There are some unit tests, but without very high coverage of features. Recently, we've found or fixed a few errors which probably could have been caught by a medium level of integration testing. However, testing `console-subscriber` isn't straight forward. It is effectively a tracing subscriber (or layer) on one end, and a gRPC server on the other end. This change adds enough of a testing framework to write some initial integration tests. It is the first step towards closing #450. Each test comprises 2 parts: - One or more "expected tasks" - A future which will be driven to completion on a dedicated Tokio runtime. Behind the scenes, a console subscriber layer is created and its server part is connected to a duplex stream. The client of the duplex stream then records incoming updates and reconstructs "actual tasks". The layer itself is set as the default subscriber for the duration of `block_on` which is used to drive the provided future to completioin. The expected tasks have a set of "matches", which is how we find the actual task that we want to validate against. Currently, the only value we match on is the task's name. The expected tasks also have a set of "expectations". These are other fields on the actual task which are validated once a matching task is found. Currently, the two fields which can have expectations set on them are `wakes` and `self_wakes`. So, to construct an expected task, which will match a task with the name `"my-task"` and then validate that the matched task gets woken once, the code would be: ```rust ExpectedTask::default() .match_name("my-task") .expect_wakes(1); ``` A future which passes this test could be: ```rust async { task::Builder::new() .name("my-task") .spawn(async { tokio::time::sleep(std::time::Duration::ZERO).await }) } ``` The full test would then look like: ```rust fn wakes_once() { let expected_task = ExpectedTask::default() .match_name("my-task") .expect_wakes(1); let future = async { task::Builder::new() .name("my-task") .spawn(async { tokio::time::sleep(std::time::Duration::ZERO).await }) }; assert_task(expected_task, future); } ``` The PR depends on 2 others: - #447 which fixes an error in the logic that determines whether a task is retained in the aggregator or not. - #451 which exposes the server parts and is necessary to allow us to connect the instrument server and client via a duplex channel. This change contains some initial tests for wakes and self wakes which would have caught the error fixed in #430. Additionally there are tests for the functionality of the testing framework itself. Co-authored-by: Eliza Weisman <eliza@buoyant.io>
- Loading branch information
Showing
8 changed files
with
1,005 additions
and
0 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
//! Framework tests | ||
//! | ||
//! The tests in this module are here to verify the testing framework itself. | ||
//! As such, some of these tests may be repeated elsewhere (where we wish to | ||
//! actually test the functionality of `console-subscriber`) and others are | ||
//! negative tests that should panic. | ||
use std::time::Duration; | ||
|
||
use tokio::{task, time::sleep}; | ||
|
||
mod support; | ||
use support::{assert_task, assert_tasks, ExpectedTask}; | ||
|
||
#[test] | ||
fn expect_present() { | ||
let expected_task = ExpectedTask::default() | ||
.match_default_name() | ||
.expect_present(); | ||
|
||
let future = async { | ||
sleep(Duration::ZERO).await; | ||
}; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "Test failed: Task validation failed: | ||
- Task { name=console-test::main }: no expectations set, if you want to just expect that a matching task is present, use `expect_present()` | ||
")] | ||
fn fail_no_expectations() { | ||
let expected_task = ExpectedTask::default().match_default_name(); | ||
|
||
let future = async { | ||
sleep(Duration::ZERO).await; | ||
}; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
fn wakes() { | ||
let expected_task = ExpectedTask::default().match_default_name().expect_wakes(1); | ||
|
||
let future = async { | ||
sleep(Duration::ZERO).await; | ||
}; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "Test failed: Task validation failed: | ||
- Task { name=console-test::main }: expected `wakes` to be 5, but actual was 1 | ||
")] | ||
fn fail_wakes() { | ||
let expected_task = ExpectedTask::default().match_default_name().expect_wakes(5); | ||
|
||
let future = async { | ||
sleep(Duration::ZERO).await; | ||
}; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
fn self_wakes() { | ||
let expected_task = ExpectedTask::default() | ||
.match_default_name() | ||
.expect_self_wakes(1); | ||
|
||
let future = async { task::yield_now().await }; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "Test failed: Task validation failed: | ||
- Task { name=console-test::main }: expected `self_wakes` to be 1, but actual was 0 | ||
")] | ||
fn fail_self_wake() { | ||
let expected_task = ExpectedTask::default() | ||
.match_default_name() | ||
.expect_self_wakes(1); | ||
|
||
let future = async { | ||
sleep(Duration::ZERO).await; | ||
}; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
fn test_spawned_task() { | ||
let expected_task = ExpectedTask::default() | ||
.match_name("another-name".into()) | ||
.expect_present(); | ||
|
||
let future = async { | ||
task::Builder::new() | ||
.name("another-name") | ||
.spawn(async { task::yield_now().await }) | ||
}; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "Test failed: Task validation failed: | ||
- Task { name=wrong-name }: no matching actual task was found | ||
")] | ||
fn fail_wrong_task_name() { | ||
let expected_task = ExpectedTask::default().match_name("wrong-name".into()); | ||
|
||
let future = async { task::yield_now().await }; | ||
|
||
assert_task(expected_task, future); | ||
} | ||
|
||
#[test] | ||
fn multiple_tasks() { | ||
let expected_tasks = vec![ | ||
ExpectedTask::default() | ||
.match_name("task-1".into()) | ||
.expect_wakes(1), | ||
ExpectedTask::default() | ||
.match_name("task-2".into()) | ||
.expect_wakes(1), | ||
]; | ||
|
||
let future = async { | ||
let task1 = task::Builder::new() | ||
.name("task-1") | ||
.spawn(async { task::yield_now().await }) | ||
.unwrap(); | ||
let task2 = task::Builder::new() | ||
.name("task-2") | ||
.spawn(async { task::yield_now().await }) | ||
.unwrap(); | ||
|
||
tokio::try_join! { | ||
task1, | ||
task2, | ||
} | ||
.unwrap(); | ||
}; | ||
|
||
assert_tasks(expected_tasks, future); | ||
} | ||
|
||
#[test] | ||
#[should_panic(expected = "Test failed: Task validation failed: | ||
- Task { name=task-2 }: expected `wakes` to be 2, but actual was 1 | ||
")] | ||
fn fail_1_of_2_expected_tasks() { | ||
let expected_tasks = vec![ | ||
ExpectedTask::default() | ||
.match_name("task-1".into()) | ||
.expect_wakes(1), | ||
ExpectedTask::default() | ||
.match_name("task-2".into()) | ||
.expect_wakes(2), | ||
]; | ||
|
||
let future = async { | ||
let task1 = task::Builder::new() | ||
.name("task-1") | ||
.spawn(async { task::yield_now().await }) | ||
.unwrap(); | ||
let task2 = task::Builder::new() | ||
.name("task-2") | ||
.spawn(async { task::yield_now().await }) | ||
.unwrap(); | ||
|
||
tokio::try_join! { | ||
task1, | ||
task2, | ||
} | ||
.unwrap(); | ||
}; | ||
|
||
assert_tasks(expected_tasks, future); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
use futures::Future; | ||
|
||
mod state; | ||
mod subscriber; | ||
mod task; | ||
|
||
use subscriber::run_test; | ||
|
||
pub(crate) use subscriber::MAIN_TASK_NAME; | ||
pub(crate) use task::ExpectedTask; | ||
|
||
/// Assert that an `expected_task` is recorded by a console-subscriber | ||
/// when driving the provided `future` to completion. | ||
/// | ||
/// This function is equivalent to calling [`assert_tasks`] with a vector | ||
/// containing a single task. | ||
/// | ||
/// # Panics | ||
/// | ||
/// This function will panic if the expectations on the expected task are not | ||
/// met or if a matching task is not recorded. | ||
#[track_caller] | ||
#[allow(dead_code)] | ||
pub(crate) fn assert_task<Fut>(expected_task: ExpectedTask, future: Fut) | ||
where | ||
Fut: Future + Send + 'static, | ||
Fut::Output: Send + 'static, | ||
{ | ||
run_test(vec![expected_task], future) | ||
} | ||
|
||
/// Assert that the `expected_tasks` are recorded by a console-subscriber | ||
/// when driving the provided `future` to completion. | ||
/// | ||
/// # Panics | ||
/// | ||
/// This function will panic if the expectations on any of the expected tasks | ||
/// are not met or if matching tasks are not recorded for all expected tasks. | ||
#[track_caller] | ||
#[allow(dead_code)] | ||
pub(crate) fn assert_tasks<Fut>(expected_tasks: Vec<ExpectedTask>, future: Fut) | ||
where | ||
Fut: Future + Send + 'static, | ||
Fut::Output: Send + 'static, | ||
{ | ||
run_test(expected_tasks, future) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
use std::fmt; | ||
|
||
use tokio::sync::broadcast::{ | ||
self, | ||
error::{RecvError, TryRecvError}, | ||
}; | ||
|
||
/// A step in the running of the test | ||
#[derive(Clone, Debug, PartialEq, PartialOrd)] | ||
pub(super) enum TestStep { | ||
/// The overall test has begun | ||
Start, | ||
/// The instrument server has been started | ||
ServerStarted, | ||
/// The client has connected to the instrument server | ||
ClientConnected, | ||
/// The future being driven has completed | ||
TestFinished, | ||
/// The client has finished recording updates | ||
UpdatesRecorded, | ||
} | ||
|
||
impl fmt::Display for TestStep { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
(self as &dyn fmt::Debug).fmt(f) | ||
} | ||
} | ||
|
||
/// The state of the test. | ||
/// | ||
/// This struct is used by various parts of the test framework to wait until | ||
/// a specific test step has been reached and advance the test state to a new | ||
/// step. | ||
pub(super) struct TestState { | ||
receiver: broadcast::Receiver<TestStep>, | ||
sender: broadcast::Sender<TestStep>, | ||
step: TestStep, | ||
} | ||
|
||
impl TestState { | ||
pub(super) fn new() -> Self { | ||
let (sender, receiver) = broadcast::channel(1); | ||
Self { | ||
receiver, | ||
sender, | ||
step: TestStep::Start, | ||
} | ||
} | ||
|
||
/// Wait asynchronously until the desired step has been reached. | ||
/// | ||
/// # Panics | ||
/// | ||
/// This function will panic if the underlying channel gets closed. | ||
pub(super) async fn wait_for_step(&mut self, desired_step: TestStep) { | ||
while self.step < desired_step { | ||
match self.receiver.recv().await { | ||
Ok(step) => self.step = step, | ||
Err(RecvError::Lagged(_)) => { | ||
// we don't mind being lagged, we'll just get the latest state | ||
} | ||
Err(RecvError::Closed) => panic!( | ||
"console-test error: failed to receive current step, \ | ||
waiting for step: {desired_step}. This shouldn't happen, \ | ||
did the test abort?" | ||
), | ||
} | ||
} | ||
} | ||
|
||
/// Returns `true` if the current step is `desired_step` or later. | ||
pub(super) fn is_step(&mut self, desired_step: TestStep) -> bool { | ||
self.update_step(); | ||
|
||
self.step == desired_step | ||
} | ||
|
||
/// Advance to the next step. | ||
/// | ||
/// The test must be at the step prior to the next step before starting. | ||
/// Being in a different step is likely to indicate a logic error in the | ||
/// test framework. | ||
/// | ||
/// # Panics | ||
/// | ||
/// This method will panic if the test state is not at the step prior to | ||
/// `next_step`, or if the underlying channel is closed. | ||
#[track_caller] | ||
pub(super) fn advance_to_step(&mut self, next_step: TestStep) { | ||
self.update_step(); | ||
|
||
assert!( | ||
self.step < next_step, | ||
"console-test error: cannot advance to previous or current step! \ | ||
current step: {current}, next step: {next_step}. This shouldn't \ | ||
happen.", | ||
current = self.step, | ||
); | ||
|
||
match (&self.step, &next_step) { | ||
(TestStep::Start, TestStep::ServerStarted) | ||
| (TestStep::ServerStarted, TestStep::ClientConnected) | ||
| (TestStep::ClientConnected, TestStep::TestFinished) | ||
| (TestStep::TestFinished, TestStep::UpdatesRecorded) => {} | ||
(current, _) => panic!( | ||
"console-test error: test cannot advance more than one step! \ | ||
current step: {current}, next step: {next_step}. This \ | ||
shouldn't happen." | ||
), | ||
} | ||
|
||
self.sender.send(next_step).expect( | ||
"console-test error: failed to send the next test step. \ | ||
This shouldn't happen, did the test abort?", | ||
); | ||
} | ||
|
||
fn update_step(&mut self) { | ||
loop { | ||
match self.receiver.try_recv() { | ||
Ok(step) => self.step = step, | ||
Err(TryRecvError::Lagged(_)) => { | ||
// we don't mind being lagged, we'll just get the latest state | ||
} | ||
Err(TryRecvError::Closed) => panic!( | ||
"console-test error: failed to update current step, did \ | ||
the test abort?" | ||
), | ||
Err(TryRecvError::Empty) => break, | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Clone for TestState { | ||
fn clone(&self) -> Self { | ||
Self { | ||
receiver: self.receiver.resubscribe(), | ||
sender: self.sender.clone(), | ||
step: self.step.clone(), | ||
} | ||
} | ||
} |
Oops, something went wrong.