Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize fast path with Duration-based wait time #368

Merged
merged 10 commits into from
Oct 13, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.14.1-dev
- [#364](https://github.com/tag1consulting/goose/pull/364) add link from the [Developer Documentation](https://docs.rs/goose) to [The Git Book](https://book.goose.rs)
- [#368](https://github.com/tag1consulting/goose/pull/368) optimize fastpath if no delay between tasks

## 0.14.0 September 15, 2021
- [#361](https://github.com/tag1consulting/goose/pull/361) convert `README.md` (and enhance) into [`The Goose Book`](https://book.goose.rs/)
Expand Down
3 changes: 1 addition & 2 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.

use std::time::Duration;

use goose::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), GooseError> {
Expand Down
1 change: 0 additions & 1 deletion examples/simple_closure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
//! limitations under the License.

use goose::prelude::*;

use std::boxed::Box;
use std::sync::Arc;
use std::time::Duration;
Expand Down
3 changes: 1 addition & 2 deletions examples/simple_with_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.

use std::time::Duration;

use goose::prelude::*;
use serde::Deserialize;
use std::time::Duration;

struct Session {
jwt_token: String,
Expand Down
5 changes: 2 additions & 3 deletions examples/umami/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ mod common;
mod english;
mod spanish;

use std::time::Duration;

use goose::prelude::*;
use std::time::Duration;
jeremyandrews marked this conversation as resolved.
Show resolved Hide resolved

use crate::admin::*;
use crate::english::*;
Expand Down Expand Up @@ -73,7 +72,7 @@ async fn main() -> Result<(), GooseError> {
.register_taskset(
taskset!("Admin user")
.set_weight(1)?
.set_wait_time(Duration::from_secs(0), Duration::from_secs(3))?
.set_wait_time(Duration::from_secs(3), Duration::from_secs(10))?
jeremyandrews marked this conversation as resolved.
Show resolved Hide resolved
.register_task(task!(log_in).set_on_start().set_name("auth /en/user/login"))
.register_task(task!(front_page_en).set_name("auth /").set_weight(2)?)
.register_task(task!(article_listing_en).set_name("auth /en/articles/"))
Expand Down
27 changes: 14 additions & 13 deletions src/goose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@
//!
//! ### Task Set Wait Time
//!
//! Wait time is specified as a low-high duration range. Each time a task completes in
//! the task set, the user will pause for a random number of milliseconds inclusively between
//! Wait time is specified as a low-high Duration range. Each time a task completes in the
//! task set, the user will pause for a random number of milliseconds inclusively between
//! the low and high wait times. In the following example, users loading `foo` tasks will
//! sleep 0 to 3 seconds after each task completes, and users loading `bar` tasks will
//! sleep 0 to 2.5 seconds after each task completes, and users loading `bar` tasks will
//! sleep 5 to 10 seconds after each task completes.
//!
//! ```rust
//! use goose::prelude::*;
//! use std::time::Duration;
//!
//! let mut foo_tasks = taskset!("FooTasks").set_wait_time(Duration::from_secs(0), Duration::from_secs(3)).unwrap();
//! let mut foo_tasks = taskset!("FooTasks").set_wait_time(Duration::from_secs(0), Duration::from_millis(2500)).unwrap();
//! let mut bar_tasks = taskset!("BarTasks").set_wait_time(Duration::from_secs(5), Duration::from_secs(10)).unwrap();
//! ```
//! ## Creating Tasks
Expand Down Expand Up @@ -472,7 +472,8 @@ pub struct GooseTaskSet {
pub task_sets_index: usize,
/// An integer value that controls the frequency that this task set will be assigned to a user.
pub weight: usize,
/// An range of duration indicating the interval a user will sleep after running a task.
/// A [`Duration`](https://doc.rust-lang.org/std/time/struct.Duration.html) range defining the
/// minimum and maximum time a [`GooseUser`] should sleep after running a task.
pub task_wait: Option<(Duration, Duration)>,
/// A vector containing one copy of each [`GooseTask`](./struct.GooseTask.html) that will
/// run by users running this task set.
Expand Down Expand Up @@ -591,9 +592,8 @@ impl GooseTaskSet {
self
}

/// Configure a duration per task_set to pause after running each task. The length of the pause will be randomly
/// selected from `min_wait` to `max_wait` inclusively. For example, if `min_wait` is `Duration::from_secs(0)` and
/// `max_wait` is `Duration::from_secs(2)`, the user will randomly sleep between 0 and 2_000 milliseconds after each task completes.
/// Configure a task_set to to pause after running each task. The length of the pause will be randomly
/// selected from `min_wait` to `max_wait` inclusively.
///
/// # Example
/// ```rust
Expand All @@ -613,12 +613,12 @@ impl GooseTaskSet {
max_wait: Duration,
) -> Result<Self, GooseError> {
trace!(
"{} set_wait time: min: {}ms max: {}ms",
"{} set_wait time: min: {:?} max: {:?}",
self.name,
min_wait.as_millis(),
max_wait.as_millis()
min_wait,
max_wait
);
if min_wait > max_wait {
if min_wait.as_millis() > max_wait.as_millis() {
return Err(GooseError::InvalidWaitTime {
min_wait,
max_wait,
Expand All @@ -627,7 +627,7 @@ impl GooseTaskSet {
.to_string(),
});
}
self.task_wait.replace((min_wait, max_wait));
self.task_wait = Some((min_wait, max_wait));

Ok(self)
}
Expand Down Expand Up @@ -2714,6 +2714,7 @@ mod tests {
assert_eq!(task_set.tasks.len(), 3);
assert_eq!(task_set.weighted_tasks.len(), 0);
assert_eq!(task_set.task_sets_index, usize::max_value());
assert_eq!(task_set.task_wait, None);

// Host field can be changed.
task_set = task_set.set_host("https://bar.example.com/");
Expand Down
16 changes: 8 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@
//! helper, for example to set a timeout on this specific request:
//!
//! ```rust
//! use std::time;
//! use std::time::Duration;
//!
//! use goose::prelude::*;
//!
//! async fn loadtest_bar(user: &mut GooseUser) -> GooseTaskResult {
//! let request_builder = user.goose_get("/path/to/bar")?;
//! let _goose = user.goose_send(request_builder.timeout(time::Duration::from_secs(3)), None).await?;
//! let _goose = user.goose_send(request_builder.timeout(Duration::from_secs(3)), None).await?;
//!
//! Ok(())
//! }
Expand Down Expand Up @@ -468,8 +468,8 @@ use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use std::{fmt, io, time};
use std::time::{self, Duration};
use std::{fmt, io};
use tokio::fs::File;

use crate::config::{GooseConfiguration, GooseDefaults};
Expand Down Expand Up @@ -1805,9 +1805,9 @@ impl GooseAttack {
{
let sleep_delay = self.configuration.running_metrics.unwrap() * 1_000;
goose_attack_run_state.spawn_user_in_ms -= sleep_delay;
tokio::time::Duration::from_millis(sleep_delay as u64)
Duration::from_millis(sleep_delay as u64)
} else {
tokio::time::Duration::from_millis(goose_attack_run_state.spawn_user_in_ms as u64)
Duration::from_millis(goose_attack_run_state.spawn_user_in_ms as u64)
};
debug!("sleeping {:?}...", sleep_duration);
goose_attack_run_state.drift_timer =
Expand All @@ -1817,7 +1817,7 @@ impl GooseAttack {
// If enough users have been spawned, move onto the next attack phase.
if self.weighted_users.is_empty() {
// Pause a tenth of a second waiting for the final user to fully start up.
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;

if self.attack_mode == AttackMode::Worker {
info!(
Expand Down Expand Up @@ -2050,7 +2050,7 @@ impl GooseAttack {
if self.configuration.no_autostart {
// Sleep then check for further instructions.
if goose_attack_run_state.idle_status_displayed {
let sleep_duration = tokio::time::Duration::from_millis(250);
let sleep_duration = Duration::from_millis(250);
debug!("sleeping {:?}...", sleep_duration);
goose_attack_run_state.drift_timer = util::sleep_minus_drift(
sleep_duration,
Expand Down
144 changes: 85 additions & 59 deletions src/user.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use futures::future::Fuse;
use futures::{pin_mut, select, FutureExt};
use rand::Rng;
use std::time::{self, Duration, Instant};
use std::time::{self, Duration};

use crate::get_worker_id;
use crate::goose::{GooseTaskFunction, GooseTaskSet, GooseUser, GooseUserCommand};
Expand All @@ -10,7 +8,7 @@ use crate::metrics::{GooseMetric, GooseTaskMetric};

pub(crate) async fn user_main(
thread_number: usize,
mut thread_task_set: GooseTaskSet,
thread_task_set: GooseTaskSet,
mut thread_user: GooseUser,
thread_receiver: flume::Receiver<GooseUserCommand>,
worker: bool,
Expand Down Expand Up @@ -52,64 +50,73 @@ pub(crate) async fn user_main(

// If normal tasks are defined, loop launching tasks until parent tells us to stop.
if !thread_task_set.weighted_tasks.is_empty() {
let mut task_iter = thread_task_set.weighted_tasks.iter().cycle();
let next_task_delay = Fuse::terminated();
pin_mut!(next_task_delay);

let task_wait = match thread_task_set.task_wait.take() {
Some((min, max)) if min == max => min,
Some((min, max)) => Duration::from_millis(
rand::thread_rng().gen_range(min.as_millis()..max.as_millis()) as u64,
),
None => Duration::from_millis(0),
};

next_task_delay.set(tokio::time::sleep(Duration::from_secs(0)).fuse());
loop {
select! {
_ = next_task_delay => {
let (thread_task_index, thread_task_name) = task_iter.next().unwrap();
if *thread_task_index == 0 {
// Tracks the time it takes to loop through all GooseTasks when Coordinated Omission
// Mitigation is enabled.
thread_user.update_request_cadence(thread_number).await;
}
// When there is a delay between tasks, wake every second to check for messages.
let one_second = Duration::from_secs(1);

'launch_tasks: loop {
// Tracks the time it takes to loop through all GooseTasks when Coordinated Omission
// Mitigation is enabled.
thread_user.update_request_cadence(thread_number).await;

for (thread_task_index, thread_task_name) in &thread_task_set.weighted_tasks {
// Determine which task we're going to run next.
let function = &thread_task_set.tasks[*thread_task_index].function;
debug!(
"launching on_start {} task from {}",
thread_task_name, thread_task_set.name
);
// Invoke the task function.
let _todo = invoke_task_function(
function,
&mut thread_user,
*thread_task_index,
thread_task_name,
)
.await;

if received_exit(&thread_receiver) {
break 'launch_tasks;
}

// Get a reference to the task function we're going to invoke next.
let function = &thread_task_set.tasks[*thread_task_index].function;
debug!(
"launching on_start {} task from {}",
thread_task_name, thread_task_set.name
);

let now = Instant::now();
// Invoke the task function.
let _ = invoke_task_function(
function,
&mut thread_user,
*thread_task_index,
thread_task_name,
)
.await;

let elapsed = now.elapsed();

if elapsed < task_wait {
next_task_delay.set(tokio::time::sleep(task_wait - elapsed).fuse());
} else {
next_task_delay.set(tokio::time::sleep(Duration::from_millis(0)).fuse());
}
},
message = thread_receiver.recv_async().fuse() => {
match message {
// Time to exit, break out of launch_tasks loop.
Err(_) | Ok(GooseUserCommand::Exit) => {
break ;
}
Ok(command) => {
debug!("ignoring unexpected GooseUserCommand: {:?}", command);
// If the task_wait is defined, wait for a random time between tasks.
if let Some((min, max)) = thread_task_set.task_wait {
let wait_time = rand::thread_rng().gen_range(min..max).as_millis();
// Counter to track how long we've slept, waking regularly to check for messages.
let mut slept: u128 = 0;
// Wake every second to check if the parent thread has told us to exit.
let mut in_sleep_loop = true;
// Track the time slept for Coordinated Omission Mitigation.
let sleep_timer = time::Instant::now();

while in_sleep_loop {
if received_exit(&thread_receiver) {
break 'launch_tasks;
}

let sleep_duration = if wait_time - slept >= 1000 {
slept += 1000;
if slept >= wait_time {
// Break out of sleep loop after next sleep.
in_sleep_loop = false;
}
one_second
} else {
slept += wait_time;
// Break out of sleep loop after next sleep.
in_sleep_loop = false;
Duration::from_millis((wait_time - slept) as u64)
};

debug!(
"user {} from {} sleeping {:?} ...",
thread_number, thread_task_set.name, sleep_duration
);

tokio::time::sleep(sleep_duration).await;
}
// Track how much time the GooseUser sleeps during this loop through all GooseTasks,
// used by Coordinated Omission Mitigation.
thread_user.slept += (time::Instant::now() - sleep_timer).as_millis() as u64;
}
}
}
Expand Down Expand Up @@ -152,6 +159,25 @@ pub(crate) async fn user_main(
}
}

// Determine if the parent has sent a GooseUserCommand::Exit message.
fn received_exit(thread_receiver: &flume::Receiver<GooseUserCommand>) -> bool {
let mut message = thread_receiver.try_recv();
while message.is_ok() {
match message.unwrap() {
// GooseUserCommand::Exit received.
GooseUserCommand::Exit => {
return true;
}
command => {
debug!("ignoring unexpected GooseUserCommand: {:?}", command);
}
}
message = thread_receiver.try_recv();
}
// GooseUserCommand::Exit not received.
false
}

// Invoke the task function, collecting task metrics.
async fn invoke_task_function(
function: &GooseTaskFunction,
Expand Down
6 changes: 3 additions & 3 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ pub fn parse_timespan(time_str: &str) -> usize {
///
/// // Do other stuff, in this case sleep 250 milliseconds. This is
/// // the "drift" that will be subtracted from the sleep time later.
/// tokio::time::sleep(tokio::time::Duration::from_millis(250));
/// tokio::time::sleep(std::time::Duration::from_millis(250));
///
/// // Sleep for 1 second minus the time spent doing other stuff.
/// drift_timer = util::sleep_minus_drift(
/// tokio::time::Duration::from_secs(1),
/// std::time::Duration::from_secs(1),
/// drift_timer,
/// ).await;
///
Expand All @@ -98,7 +98,7 @@ pub fn parse_timespan(time_str: &str) -> usize {
/// }
/// ```
pub async fn sleep_minus_drift(
duration: tokio::time::Duration,
duration: std::time::Duration,
drift: tokio::time::Instant,
) -> tokio::time::Instant {
match duration.checked_sub(drift.elapsed()) {
Expand Down