diff --git a/CHANGELOG.md b/CHANGELOG.md index 39bb688e..5b4fc912 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.14.1-dev - [#364](https://github.com/tag1consulting/goose/pull/364) add link from the [Developer Documentation](https://docs.rs/goose) to [The Git Book](https://book.goose.rs) + - [#368](https://github.com/tag1consulting/goose/pull/368) optimize fastpath if no delay between tasks ## 0.14.0 September 15, 2021 - [#361](https://github.com/tag1consulting/goose/pull/361) convert `README.md` (and enhance) into [`The Goose Book`](https://book.goose.rs/) diff --git a/examples/simple.rs b/examples/simple.rs index a9b2e56f..09f86581 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -17,9 +17,8 @@ //! See the License for the specific language governing permissions and //! limitations under the License. -use std::time::Duration; - use goose::prelude::*; +use std::time::Duration; #[tokio::main] async fn main() -> Result<(), GooseError> { diff --git a/examples/simple_closure.rs b/examples/simple_closure.rs index 9c106c2e..3519ba5e 100644 --- a/examples/simple_closure.rs +++ b/examples/simple_closure.rs @@ -17,7 +17,6 @@ //! limitations under the License. use goose::prelude::*; - use std::boxed::Box; use std::sync::Arc; use std::time::Duration; diff --git a/examples/simple_with_session.rs b/examples/simple_with_session.rs index 23537196..e077e42a 100644 --- a/examples/simple_with_session.rs +++ b/examples/simple_with_session.rs @@ -17,10 +17,9 @@ //! See the License for the specific language governing permissions and //! limitations under the License. -use std::time::Duration; - use goose::prelude::*; use serde::Deserialize; +use std::time::Duration; struct Session { jwt_token: String, diff --git a/examples/umami/main.rs b/examples/umami/main.rs index 9266285a..4b0789a2 100644 --- a/examples/umami/main.rs +++ b/examples/umami/main.rs @@ -3,9 +3,8 @@ mod common; mod english; mod spanish; -use std::time::Duration; - use goose::prelude::*; +use std::time::Duration; use crate::admin::*; use crate::english::*; @@ -73,7 +72,7 @@ async fn main() -> Result<(), GooseError> { .register_taskset( taskset!("Admin user") .set_weight(1)? - .set_wait_time(Duration::from_secs(0), Duration::from_secs(3))? + .set_wait_time(Duration::from_secs(3), Duration::from_secs(10))? .register_task(task!(log_in).set_on_start().set_name("auth /en/user/login")) .register_task(task!(front_page_en).set_name("auth /").set_weight(2)?) .register_task(task!(article_listing_en).set_name("auth /en/articles/")) diff --git a/src/goose.rs b/src/goose.rs index 9e936c1e..65c4a509 100644 --- a/src/goose.rs +++ b/src/goose.rs @@ -55,17 +55,17 @@ //! //! ### Task Set Wait Time //! -//! Wait time is specified as a low-high duration range. Each time a task completes in -//! the task set, the user will pause for a random number of milliseconds inclusively between +//! Wait time is specified as a low-high Duration range. Each time a task completes in the +//! task set, the user will pause for a random number of milliseconds inclusively between //! the low and high wait times. In the following example, users loading `foo` tasks will -//! sleep 0 to 3 seconds after each task completes, and users loading `bar` tasks will +//! sleep 0 to 2.5 seconds after each task completes, and users loading `bar` tasks will //! sleep 5 to 10 seconds after each task completes. //! //! ```rust //! use goose::prelude::*; //! use std::time::Duration; //! -//! let mut foo_tasks = taskset!("FooTasks").set_wait_time(Duration::from_secs(0), Duration::from_secs(3)).unwrap(); +//! let mut foo_tasks = taskset!("FooTasks").set_wait_time(Duration::from_secs(0), Duration::from_millis(2500)).unwrap(); //! let mut bar_tasks = taskset!("BarTasks").set_wait_time(Duration::from_secs(5), Duration::from_secs(10)).unwrap(); //! ``` //! ## Creating Tasks @@ -472,7 +472,8 @@ pub struct GooseTaskSet { pub task_sets_index: usize, /// An integer value that controls the frequency that this task set will be assigned to a user. pub weight: usize, - /// An range of duration indicating the interval a user will sleep after running a task. + /// A [`Duration`](https://doc.rust-lang.org/std/time/struct.Duration.html) range defining the + /// minimum and maximum time a [`GooseUser`] should sleep after running a task. pub task_wait: Option<(Duration, Duration)>, /// A vector containing one copy of each [`GooseTask`](./struct.GooseTask.html) that will /// run by users running this task set. @@ -591,9 +592,8 @@ impl GooseTaskSet { self } - /// Configure a duration per task_set to pause after running each task. The length of the pause will be randomly - /// selected from `min_wait` to `max_wait` inclusively. For example, if `min_wait` is `Duration::from_secs(0)` and - /// `max_wait` is `Duration::from_secs(2)`, the user will randomly sleep between 0 and 2_000 milliseconds after each task completes. + /// Configure a task_set to to pause after running each task. The length of the pause will be randomly + /// selected from `min_wait` to `max_wait` inclusively. /// /// # Example /// ```rust @@ -613,12 +613,12 @@ impl GooseTaskSet { max_wait: Duration, ) -> Result { trace!( - "{} set_wait time: min: {}ms max: {}ms", + "{} set_wait time: min: {:?} max: {:?}", self.name, - min_wait.as_millis(), - max_wait.as_millis() + min_wait, + max_wait ); - if min_wait > max_wait { + if min_wait.as_millis() > max_wait.as_millis() { return Err(GooseError::InvalidWaitTime { min_wait, max_wait, @@ -627,7 +627,7 @@ impl GooseTaskSet { .to_string(), }); } - self.task_wait.replace((min_wait, max_wait)); + self.task_wait = Some((min_wait, max_wait)); Ok(self) } @@ -2714,6 +2714,7 @@ mod tests { assert_eq!(task_set.tasks.len(), 3); assert_eq!(task_set.weighted_tasks.len(), 0); assert_eq!(task_set.task_sets_index, usize::max_value()); + assert_eq!(task_set.task_wait, None); // Host field can be changed. task_set = task_set.set_host("https://bar.example.com/"); diff --git a/src/lib.rs b/src/lib.rs index f2e165aa..276a7c78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,13 +84,13 @@ //! helper, for example to set a timeout on this specific request: //! //! ```rust -//! use std::time; +//! use std::time::Duration; //! //! use goose::prelude::*; //! //! async fn loadtest_bar(user: &mut GooseUser) -> GooseTaskResult { //! let request_builder = user.goose_get("/path/to/bar")?; -//! let _goose = user.goose_send(request_builder.timeout(time::Duration::from_secs(3)), None).await?; +//! let _goose = user.goose_send(request_builder.timeout(Duration::from_secs(3)), None).await?; //! //! Ok(()) //! } @@ -468,8 +468,8 @@ use std::sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }; -use std::time::Duration; -use std::{fmt, io, time}; +use std::time::{self, Duration}; +use std::{fmt, io}; use tokio::fs::File; use crate::config::{GooseConfiguration, GooseDefaults}; @@ -1805,9 +1805,9 @@ impl GooseAttack { { let sleep_delay = self.configuration.running_metrics.unwrap() * 1_000; goose_attack_run_state.spawn_user_in_ms -= sleep_delay; - tokio::time::Duration::from_millis(sleep_delay as u64) + Duration::from_millis(sleep_delay as u64) } else { - tokio::time::Duration::from_millis(goose_attack_run_state.spawn_user_in_ms as u64) + Duration::from_millis(goose_attack_run_state.spawn_user_in_ms as u64) }; debug!("sleeping {:?}...", sleep_duration); goose_attack_run_state.drift_timer = @@ -1817,7 +1817,7 @@ impl GooseAttack { // If enough users have been spawned, move onto the next attack phase. if self.weighted_users.is_empty() { // Pause a tenth of a second waiting for the final user to fully start up. - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; if self.attack_mode == AttackMode::Worker { info!( @@ -2050,7 +2050,7 @@ impl GooseAttack { if self.configuration.no_autostart { // Sleep then check for further instructions. if goose_attack_run_state.idle_status_displayed { - let sleep_duration = tokio::time::Duration::from_millis(250); + let sleep_duration = Duration::from_millis(250); debug!("sleeping {:?}...", sleep_duration); goose_attack_run_state.drift_timer = util::sleep_minus_drift( sleep_duration, diff --git a/src/user.rs b/src/user.rs index db4e4ff4..98e22925 100644 --- a/src/user.rs +++ b/src/user.rs @@ -1,7 +1,5 @@ -use futures::future::Fuse; -use futures::{pin_mut, select, FutureExt}; use rand::Rng; -use std::time::{self, Duration, Instant}; +use std::time::{self, Duration}; use crate::get_worker_id; use crate::goose::{GooseTaskFunction, GooseTaskSet, GooseUser, GooseUserCommand}; @@ -10,7 +8,7 @@ use crate::metrics::{GooseMetric, GooseTaskMetric}; pub(crate) async fn user_main( thread_number: usize, - mut thread_task_set: GooseTaskSet, + thread_task_set: GooseTaskSet, mut thread_user: GooseUser, thread_receiver: flume::Receiver, worker: bool, @@ -52,64 +50,73 @@ pub(crate) async fn user_main( // If normal tasks are defined, loop launching tasks until parent tells us to stop. if !thread_task_set.weighted_tasks.is_empty() { - let mut task_iter = thread_task_set.weighted_tasks.iter().cycle(); - let next_task_delay = Fuse::terminated(); - pin_mut!(next_task_delay); - - let task_wait = match thread_task_set.task_wait.take() { - Some((min, max)) if min == max => min, - Some((min, max)) => Duration::from_millis( - rand::thread_rng().gen_range(min.as_millis()..max.as_millis()) as u64, - ), - None => Duration::from_millis(0), - }; - - next_task_delay.set(tokio::time::sleep(Duration::from_secs(0)).fuse()); - loop { - select! { - _ = next_task_delay => { - let (thread_task_index, thread_task_name) = task_iter.next().unwrap(); - if *thread_task_index == 0 { - // Tracks the time it takes to loop through all GooseTasks when Coordinated Omission - // Mitigation is enabled. - thread_user.update_request_cadence(thread_number).await; - } + // When there is a delay between tasks, wake every second to check for messages. + let one_second = Duration::from_secs(1); + + 'launch_tasks: loop { + // Tracks the time it takes to loop through all GooseTasks when Coordinated Omission + // Mitigation is enabled. + thread_user.update_request_cadence(thread_number).await; + + for (thread_task_index, thread_task_name) in &thread_task_set.weighted_tasks { + // Determine which task we're going to run next. + let function = &thread_task_set.tasks[*thread_task_index].function; + debug!( + "launching on_start {} task from {}", + thread_task_name, thread_task_set.name + ); + // Invoke the task function. + let _todo = invoke_task_function( + function, + &mut thread_user, + *thread_task_index, + thread_task_name, + ) + .await; + + if received_exit(&thread_receiver) { + break 'launch_tasks; + } - // Get a reference to the task function we're going to invoke next. - let function = &thread_task_set.tasks[*thread_task_index].function; - debug!( - "launching on_start {} task from {}", - thread_task_name, thread_task_set.name - ); - - let now = Instant::now(); - // Invoke the task function. - let _ = invoke_task_function( - function, - &mut thread_user, - *thread_task_index, - thread_task_name, - ) - .await; - - let elapsed = now.elapsed(); - - if elapsed < task_wait { - next_task_delay.set(tokio::time::sleep(task_wait - elapsed).fuse()); - } else { - next_task_delay.set(tokio::time::sleep(Duration::from_millis(0)).fuse()); - } - }, - message = thread_receiver.recv_async().fuse() => { - match message { - // Time to exit, break out of launch_tasks loop. - Err(_) | Ok(GooseUserCommand::Exit) => { - break ; - } - Ok(command) => { - debug!("ignoring unexpected GooseUserCommand: {:?}", command); + // If the task_wait is defined, wait for a random time between tasks. + if let Some((min, max)) = thread_task_set.task_wait { + let wait_time = rand::thread_rng().gen_range(min..max).as_millis(); + // Counter to track how long we've slept, waking regularly to check for messages. + let mut slept: u128 = 0; + // Wake every second to check if the parent thread has told us to exit. + let mut in_sleep_loop = true; + // Track the time slept for Coordinated Omission Mitigation. + let sleep_timer = time::Instant::now(); + + while in_sleep_loop { + if received_exit(&thread_receiver) { + break 'launch_tasks; } + + let sleep_duration = if wait_time - slept >= 1000 { + slept += 1000; + if slept >= wait_time { + // Break out of sleep loop after next sleep. + in_sleep_loop = false; + } + one_second + } else { + slept += wait_time; + // Break out of sleep loop after next sleep. + in_sleep_loop = false; + Duration::from_millis((wait_time - slept) as u64) + }; + + debug!( + "user {} from {} sleeping {:?} ...", + thread_number, thread_task_set.name, sleep_duration + ); + + tokio::time::sleep(sleep_duration).await; } + // Track how much time the GooseUser sleeps during this loop through all GooseTasks, + // used by Coordinated Omission Mitigation. + thread_user.slept += (time::Instant::now() - sleep_timer).as_millis() as u64; } } } @@ -152,6 +159,25 @@ pub(crate) async fn user_main( } } +// Determine if the parent has sent a GooseUserCommand::Exit message. +fn received_exit(thread_receiver: &flume::Receiver) -> bool { + let mut message = thread_receiver.try_recv(); + while message.is_ok() { + match message.unwrap() { + // GooseUserCommand::Exit received. + GooseUserCommand::Exit => { + return true; + } + command => { + debug!("ignoring unexpected GooseUserCommand: {:?}", command); + } + } + message = thread_receiver.try_recv(); + } + // GooseUserCommand::Exit not received. + false +} + // Invoke the task function, collecting task metrics. async fn invoke_task_function( function: &GooseTaskFunction, diff --git a/src/util.rs b/src/util.rs index dbd04a3b..28400680 100644 --- a/src/util.rs +++ b/src/util.rs @@ -82,11 +82,11 @@ pub fn parse_timespan(time_str: &str) -> usize { /// /// // Do other stuff, in this case sleep 250 milliseconds. This is /// // the "drift" that will be subtracted from the sleep time later. -/// tokio::time::sleep(tokio::time::Duration::from_millis(250)); +/// tokio::time::sleep(std::time::Duration::from_millis(250)); /// /// // Sleep for 1 second minus the time spent doing other stuff. /// drift_timer = util::sleep_minus_drift( -/// tokio::time::Duration::from_secs(1), +/// std::time::Duration::from_secs(1), /// drift_timer, /// ).await; /// @@ -98,7 +98,7 @@ pub fn parse_timespan(time_str: &str) -> usize { /// } /// ``` pub async fn sleep_minus_drift( - duration: tokio::time::Duration, + duration: std::time::Duration, drift: tokio::time::Instant, ) -> tokio::time::Instant { match duration.checked_sub(drift.elapsed()) {