Skip to content

Commit

Permalink
Merge pull request #368 from jeremyandrews/revert-test
Browse files Browse the repository at this point in the history
optimize fast path with Duration-based wait time
  • Loading branch information
jeremyandrews authored Oct 13, 2021
2 parents dd81446 + a4ba013 commit 3db41cf
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.14.1-dev
- [#364](https://github.com/tag1consulting/goose/pull/364) add link from the [Developer Documentation](https://docs.rs/goose) to [The Git Book](https://book.goose.rs)
- [#368](https://github.com/tag1consulting/goose/pull/368) optimize fastpath if no delay between tasks

## 0.14.0 September 15, 2021
- [#361](https://github.com/tag1consulting/goose/pull/361) convert `README.md` (and enhance) into [`The Goose Book`](https://book.goose.rs/)
Expand Down
3 changes: 1 addition & 2 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.
use std::time::Duration;

use goose::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), GooseError> {
Expand Down
1 change: 0 additions & 1 deletion examples/simple_closure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
//! limitations under the License.
use goose::prelude::*;

use std::boxed::Box;
use std::sync::Arc;
use std::time::Duration;
Expand Down
3 changes: 1 addition & 2 deletions examples/simple_with_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.
use std::time::Duration;

use goose::prelude::*;
use serde::Deserialize;
use std::time::Duration;

struct Session {
jwt_token: String,
Expand Down
5 changes: 2 additions & 3 deletions examples/umami/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ mod common;
mod english;
mod spanish;

use std::time::Duration;

use goose::prelude::*;
use std::time::Duration;

use crate::admin::*;
use crate::english::*;
Expand Down Expand Up @@ -73,7 +72,7 @@ async fn main() -> Result<(), GooseError> {
.register_taskset(
taskset!("Admin user")
.set_weight(1)?
.set_wait_time(Duration::from_secs(0), Duration::from_secs(3))?
.set_wait_time(Duration::from_secs(3), Duration::from_secs(10))?
.register_task(task!(log_in).set_on_start().set_name("auth /en/user/login"))
.register_task(task!(front_page_en).set_name("auth /").set_weight(2)?)
.register_task(task!(article_listing_en).set_name("auth /en/articles/"))
Expand Down
27 changes: 14 additions & 13 deletions src/goose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@
//!
//! ### Task Set Wait Time
//!
//! Wait time is specified as a low-high duration range. Each time a task completes in
//! the task set, the user will pause for a random number of milliseconds inclusively between
//! Wait time is specified as a low-high Duration range. Each time a task completes in the
//! task set, the user will pause for a random number of milliseconds inclusively between
//! the low and high wait times. In the following example, users loading `foo` tasks will
//! sleep 0 to 3 seconds after each task completes, and users loading `bar` tasks will
//! sleep 0 to 2.5 seconds after each task completes, and users loading `bar` tasks will
//! sleep 5 to 10 seconds after each task completes.
//!
//! ```rust
//! use goose::prelude::*;
//! use std::time::Duration;
//!
//! let mut foo_tasks = taskset!("FooTasks").set_wait_time(Duration::from_secs(0), Duration::from_secs(3)).unwrap();
//! let mut foo_tasks = taskset!("FooTasks").set_wait_time(Duration::from_secs(0), Duration::from_millis(2500)).unwrap();
//! let mut bar_tasks = taskset!("BarTasks").set_wait_time(Duration::from_secs(5), Duration::from_secs(10)).unwrap();
//! ```
//! ## Creating Tasks
Expand Down Expand Up @@ -472,7 +472,8 @@ pub struct GooseTaskSet {
pub task_sets_index: usize,
/// An integer value that controls the frequency that this task set will be assigned to a user.
pub weight: usize,
/// An range of duration indicating the interval a user will sleep after running a task.
/// A [`Duration`](https://doc.rust-lang.org/std/time/struct.Duration.html) range defining the
/// minimum and maximum time a [`GooseUser`] should sleep after running a task.
pub task_wait: Option<(Duration, Duration)>,
/// A vector containing one copy of each [`GooseTask`](./struct.GooseTask.html) that will
/// run by users running this task set.
Expand Down Expand Up @@ -591,9 +592,8 @@ impl GooseTaskSet {
self
}

/// Configure a duration per task_set to pause after running each task. The length of the pause will be randomly
/// selected from `min_wait` to `max_wait` inclusively. For example, if `min_wait` is `Duration::from_secs(0)` and
/// `max_wait` is `Duration::from_secs(2)`, the user will randomly sleep between 0 and 2_000 milliseconds after each task completes.
/// Configure a task_set to to pause after running each task. The length of the pause will be randomly
/// selected from `min_wait` to `max_wait` inclusively.
///
/// # Example
/// ```rust
Expand All @@ -613,12 +613,12 @@ impl GooseTaskSet {
max_wait: Duration,
) -> Result<Self, GooseError> {
trace!(
"{} set_wait time: min: {}ms max: {}ms",
"{} set_wait time: min: {:?} max: {:?}",
self.name,
min_wait.as_millis(),
max_wait.as_millis()
min_wait,
max_wait
);
if min_wait > max_wait {
if min_wait.as_millis() > max_wait.as_millis() {
return Err(GooseError::InvalidWaitTime {
min_wait,
max_wait,
Expand All @@ -627,7 +627,7 @@ impl GooseTaskSet {
.to_string(),
});
}
self.task_wait.replace((min_wait, max_wait));
self.task_wait = Some((min_wait, max_wait));

Ok(self)
}
Expand Down Expand Up @@ -2714,6 +2714,7 @@ mod tests {
assert_eq!(task_set.tasks.len(), 3);
assert_eq!(task_set.weighted_tasks.len(), 0);
assert_eq!(task_set.task_sets_index, usize::max_value());
assert_eq!(task_set.task_wait, None);

// Host field can be changed.
task_set = task_set.set_host("https://bar.example.com/");
Expand Down
16 changes: 8 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@
//! helper, for example to set a timeout on this specific request:
//!
//! ```rust
//! use std::time;
//! use std::time::Duration;
//!
//! use goose::prelude::*;
//!
//! async fn loadtest_bar(user: &mut GooseUser) -> GooseTaskResult {
//! let request_builder = user.goose_get("/path/to/bar")?;
//! let _goose = user.goose_send(request_builder.timeout(time::Duration::from_secs(3)), None).await?;
//! let _goose = user.goose_send(request_builder.timeout(Duration::from_secs(3)), None).await?;
//!
//! Ok(())
//! }
Expand Down Expand Up @@ -468,8 +468,8 @@ use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use std::{fmt, io, time};
use std::time::{self, Duration};
use std::{fmt, io};
use tokio::fs::File;

use crate::config::{GooseConfiguration, GooseDefaults};
Expand Down Expand Up @@ -1805,9 +1805,9 @@ impl GooseAttack {
{
let sleep_delay = self.configuration.running_metrics.unwrap() * 1_000;
goose_attack_run_state.spawn_user_in_ms -= sleep_delay;
tokio::time::Duration::from_millis(sleep_delay as u64)
Duration::from_millis(sleep_delay as u64)
} else {
tokio::time::Duration::from_millis(goose_attack_run_state.spawn_user_in_ms as u64)
Duration::from_millis(goose_attack_run_state.spawn_user_in_ms as u64)
};
debug!("sleeping {:?}...", sleep_duration);
goose_attack_run_state.drift_timer =
Expand All @@ -1817,7 +1817,7 @@ impl GooseAttack {
// If enough users have been spawned, move onto the next attack phase.
if self.weighted_users.is_empty() {
// Pause a tenth of a second waiting for the final user to fully start up.
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;

if self.attack_mode == AttackMode::Worker {
info!(
Expand Down Expand Up @@ -2050,7 +2050,7 @@ impl GooseAttack {
if self.configuration.no_autostart {
// Sleep then check for further instructions.
if goose_attack_run_state.idle_status_displayed {
let sleep_duration = tokio::time::Duration::from_millis(250);
let sleep_duration = Duration::from_millis(250);
debug!("sleeping {:?}...", sleep_duration);
goose_attack_run_state.drift_timer = util::sleep_minus_drift(
sleep_duration,
Expand Down
144 changes: 85 additions & 59 deletions src/user.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use futures::future::Fuse;
use futures::{pin_mut, select, FutureExt};
use rand::Rng;
use std::time::{self, Duration, Instant};
use std::time::{self, Duration};

use crate::get_worker_id;
use crate::goose::{GooseTaskFunction, GooseTaskSet, GooseUser, GooseUserCommand};
Expand All @@ -10,7 +8,7 @@ use crate::metrics::{GooseMetric, GooseTaskMetric};

pub(crate) async fn user_main(
thread_number: usize,
mut thread_task_set: GooseTaskSet,
thread_task_set: GooseTaskSet,
mut thread_user: GooseUser,
thread_receiver: flume::Receiver<GooseUserCommand>,
worker: bool,
Expand Down Expand Up @@ -52,64 +50,73 @@ pub(crate) async fn user_main(

// If normal tasks are defined, loop launching tasks until parent tells us to stop.
if !thread_task_set.weighted_tasks.is_empty() {
let mut task_iter = thread_task_set.weighted_tasks.iter().cycle();
let next_task_delay = Fuse::terminated();
pin_mut!(next_task_delay);

let task_wait = match thread_task_set.task_wait.take() {
Some((min, max)) if min == max => min,
Some((min, max)) => Duration::from_millis(
rand::thread_rng().gen_range(min.as_millis()..max.as_millis()) as u64,
),
None => Duration::from_millis(0),
};

next_task_delay.set(tokio::time::sleep(Duration::from_secs(0)).fuse());
loop {
select! {
_ = next_task_delay => {
let (thread_task_index, thread_task_name) = task_iter.next().unwrap();
if *thread_task_index == 0 {
// Tracks the time it takes to loop through all GooseTasks when Coordinated Omission
// Mitigation is enabled.
thread_user.update_request_cadence(thread_number).await;
}
// When there is a delay between tasks, wake every second to check for messages.
let one_second = Duration::from_secs(1);

'launch_tasks: loop {
// Tracks the time it takes to loop through all GooseTasks when Coordinated Omission
// Mitigation is enabled.
thread_user.update_request_cadence(thread_number).await;

for (thread_task_index, thread_task_name) in &thread_task_set.weighted_tasks {
// Determine which task we're going to run next.
let function = &thread_task_set.tasks[*thread_task_index].function;
debug!(
"launching on_start {} task from {}",
thread_task_name, thread_task_set.name
);
// Invoke the task function.
let _todo = invoke_task_function(
function,
&mut thread_user,
*thread_task_index,
thread_task_name,
)
.await;

if received_exit(&thread_receiver) {
break 'launch_tasks;
}

// Get a reference to the task function we're going to invoke next.
let function = &thread_task_set.tasks[*thread_task_index].function;
debug!(
"launching on_start {} task from {}",
thread_task_name, thread_task_set.name
);

let now = Instant::now();
// Invoke the task function.
let _ = invoke_task_function(
function,
&mut thread_user,
*thread_task_index,
thread_task_name,
)
.await;

let elapsed = now.elapsed();

if elapsed < task_wait {
next_task_delay.set(tokio::time::sleep(task_wait - elapsed).fuse());
} else {
next_task_delay.set(tokio::time::sleep(Duration::from_millis(0)).fuse());
}
},
message = thread_receiver.recv_async().fuse() => {
match message {
// Time to exit, break out of launch_tasks loop.
Err(_) | Ok(GooseUserCommand::Exit) => {
break ;
}
Ok(command) => {
debug!("ignoring unexpected GooseUserCommand: {:?}", command);
// If the task_wait is defined, wait for a random time between tasks.
if let Some((min, max)) = thread_task_set.task_wait {
let wait_time = rand::thread_rng().gen_range(min..max).as_millis();
// Counter to track how long we've slept, waking regularly to check for messages.
let mut slept: u128 = 0;
// Wake every second to check if the parent thread has told us to exit.
let mut in_sleep_loop = true;
// Track the time slept for Coordinated Omission Mitigation.
let sleep_timer = time::Instant::now();

while in_sleep_loop {
if received_exit(&thread_receiver) {
break 'launch_tasks;
}

let sleep_duration = if wait_time - slept >= 1000 {
slept += 1000;
if slept >= wait_time {
// Break out of sleep loop after next sleep.
in_sleep_loop = false;
}
one_second
} else {
slept += wait_time;
// Break out of sleep loop after next sleep.
in_sleep_loop = false;
Duration::from_millis((wait_time - slept) as u64)
};

debug!(
"user {} from {} sleeping {:?} ...",
thread_number, thread_task_set.name, sleep_duration
);

tokio::time::sleep(sleep_duration).await;
}
// Track how much time the GooseUser sleeps during this loop through all GooseTasks,
// used by Coordinated Omission Mitigation.
thread_user.slept += (time::Instant::now() - sleep_timer).as_millis() as u64;
}
}
}
Expand Down Expand Up @@ -152,6 +159,25 @@ pub(crate) async fn user_main(
}
}

// Determine if the parent has sent a GooseUserCommand::Exit message.
fn received_exit(thread_receiver: &flume::Receiver<GooseUserCommand>) -> bool {
let mut message = thread_receiver.try_recv();
while message.is_ok() {
match message.unwrap() {
// GooseUserCommand::Exit received.
GooseUserCommand::Exit => {
return true;
}
command => {
debug!("ignoring unexpected GooseUserCommand: {:?}", command);
}
}
message = thread_receiver.try_recv();
}
// GooseUserCommand::Exit not received.
false
}

// Invoke the task function, collecting task metrics.
async fn invoke_task_function(
function: &GooseTaskFunction,
Expand Down
6 changes: 3 additions & 3 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ pub fn parse_timespan(time_str: &str) -> usize {
///
/// // Do other stuff, in this case sleep 250 milliseconds. This is
/// // the "drift" that will be subtracted from the sleep time later.
/// tokio::time::sleep(tokio::time::Duration::from_millis(250));
/// tokio::time::sleep(std::time::Duration::from_millis(250));
///
/// // Sleep for 1 second minus the time spent doing other stuff.
/// drift_timer = util::sleep_minus_drift(
/// tokio::time::Duration::from_secs(1),
/// std::time::Duration::from_secs(1),
/// drift_timer,
/// ).await;
///
Expand All @@ -98,7 +98,7 @@ pub fn parse_timespan(time_str: &str) -> usize {
/// }
/// ```
pub async fn sleep_minus_drift(
duration: tokio::time::Duration,
duration: std::time::Duration,
drift: tokio::time::Instant,
) -> tokio::time::Instant {
match duration.checked_sub(drift.elapsed()) {
Expand Down

0 comments on commit 3db41cf

Please sign in to comment.