From 35e611f49c89426ef70c22b40934e46c588da971 Mon Sep 17 00:00:00 2001 From: Artem Medvedev Date: Sun, 7 Jul 2024 01:28:41 +0200 Subject: [PATCH 1/3] feat(wait)!: enhance `LogWaitStrategy` to wait for message appearance multiple times The interface has been changed a bit to support advanced configuration of the strategy. For example, to expect a message to appear twice in `stdout`: ```rs WaitFor::log( LogWaitStrategy::stdout("server is ready") .with_times(2), ) ``` Closes #675 --- .../src/core/containers/async_container.rs | 4 +- testcontainers/src/core/image.rs | 4 +- testcontainers/src/core/logs.rs | 28 ++++++-- testcontainers/src/core/wait/log_strategy.rs | 68 +++++++++++++++++++ testcontainers/src/core/wait/mod.rs | 40 ++++------- testcontainers/tests/async_runner.rs | 6 +- testcontainers/tests/sync_runner.rs | 6 +- testimages/src/bin/simple_web_server.rs | 1 + 8 files changed, 116 insertions(+), 41 deletions(-) create mode 100644 testcontainers/src/core/wait/log_strategy.rs diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs index 3b497219..267df147 100644 --- a/testcontainers/src/core/containers/async_container.rs +++ b/testcontainers/src/core/containers/async_container.rs @@ -204,13 +204,13 @@ where match cmd_ready_condition { CmdWaitFor::StdOutMessage { message } => { exec.stdout() - .wait_for_message(&message) + .wait_for_message(&message, 1) .await .map_err(ExecError::from)?; } CmdWaitFor::StdErrMessage { message } => { exec.stderr() - .wait_for_message(&message) + .wait_for_message(&message, 1) .await .map_err(ExecError::from)?; } diff --git a/testcontainers/src/core/image.rs b/testcontainers/src/core/image.rs index 95e68d92..e0235761 100644 --- a/testcontainers/src/core/image.rs +++ b/testcontainers/src/core/image.rs @@ -39,9 +39,7 @@ where /// up. /// /// The conditions returned from this method are evaluated **in the order** they are returned. Therefore - /// you most likely want to start with a [`WaitFor::StdOutMessage`] or [`WaitFor::StdErrMessage`] and - /// potentially follow up with a [`WaitFor::Duration`] in case the container usually needs a little - /// more time before it is ready. + /// you most likely want to start with a [`WaitFor::Log`] or [`WaitFor::Http`]. fn ready_conditions(&self) -> Vec; /// Returns the environment variables that needs to be set when a container is created. diff --git a/testcontainers/src/core/logs.rs b/testcontainers/src/core/logs.rs index 611f7221..e8cdd64b 100644 --- a/testcontainers/src/core/logs.rs +++ b/testcontainers/src/core/logs.rs @@ -26,7 +26,7 @@ pub enum WaitLogError { #[derive(Copy, Clone, Debug, parse_display::Display)] #[display(style = "lowercase")] -pub(crate) enum LogSource { +pub enum LogSource { StdOut, StdErr, } @@ -42,6 +42,13 @@ impl LogSource { } impl LogFrame { + pub fn source(&self) -> LogSource { + match self { + LogFrame::StdOut(_) => LogSource::StdOut, + LogFrame::StdErr(_) => LogSource::StdErr, + } + } + pub fn bytes(&self) -> &Bytes { match self { LogFrame::StdOut(bytes) => bytes, @@ -74,23 +81,29 @@ impl WaitingStreamWrapper { pub(crate) async fn wait_for_message( &mut self, message: impl AsRef<[u8]>, + times: usize, ) -> Result<(), WaitLogError> { let msg_finder = Finder::new(message.as_ref()); let mut messages = Vec::new(); + let mut found_times = 0; while let Some(message) = self.inner.next().await.transpose()? { messages.push(message.clone()); if self.enable_cache { self.cache.push(Ok(message.clone())); } let match_found = msg_finder.find(message.as_ref()).is_some(); - if match_found { - log::debug!("Found message after comparing {} lines", messages.len()); + found_times += usize::from(match_found); // can't overflow, because of check below + if found_times == times { + log::debug!( + "Message found {times} times after comparing {} lines", + messages.len() + ); return Ok(()); } } log::warn!( - "Failed to find message '{}' after comparing {} lines.", + "Failed to find message '{}' {times} times after comparing {} lines.", String::from_utf8_lossy(message.as_ref()), messages.len() ); @@ -125,11 +138,14 @@ mod tests { Message one Message two Message three + Message three " .into())]))); - let result = log_stream.wait_for_message("Message three").await; + let result = log_stream.wait_for_message("Message three", 2).await; + assert!(result.is_ok()); - assert!(result.is_ok()) + let result = log_stream.wait_for_message("Message two", 2).await; + assert!(result.is_err()); } } diff --git a/testcontainers/src/core/wait/log_strategy.rs b/testcontainers/src/core/wait/log_strategy.rs new file mode 100644 index 00000000..6347bdbf --- /dev/null +++ b/testcontainers/src/core/wait/log_strategy.rs @@ -0,0 +1,68 @@ +use bytes::Bytes; + +use crate::{ + core::{ + client::Client, + error::WaitContainerError, + logs::{LogSource, WaitingStreamWrapper}, + wait::WaitStrategy, + }, + ContainerAsync, Image, +}; + +#[derive(Debug, Clone)] +pub struct LogWaitStrategy { + source: LogSource, + message: Bytes, + times: usize, +} + +impl LogWaitStrategy { + /// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard output logs. + /// Shortcut for `LogWaitStrategy::new(LogSource::StdOut, message)`. + pub fn stdout(message: impl AsRef<[u8]>) -> Self { + Self::new(LogSource::StdOut, message) + } + + /// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard error logs. + /// Shortcut for `LogWaitStrategy::new(LogSource::StdErr, message)`. + pub fn stderr(message: impl AsRef<[u8]>) -> Self { + Self::new(LogSource::StdErr, message) + } + + /// Create a new `LogWaitStrategy` with the given log source and message. + /// The message is expected to appear in the logs exactly once by default. + pub fn new(source: LogSource, message: impl AsRef<[u8]>) -> Self { + Self { + source, + message: Bytes::from(message.as_ref().to_vec()), + times: 1, + } + } + + /// Set the number of times the message should appear in the logs. + pub fn with_times(mut self, times: usize) -> Self { + self.times = times; + self + } +} + +impl WaitStrategy for LogWaitStrategy { + async fn wait_until_ready( + self, + client: &Client, + container: &ContainerAsync, + ) -> crate::core::error::Result<()> { + let log_stream = match self.source { + LogSource::StdOut => client.stdout_logs(container.id(), true), + LogSource::StdErr => client.stderr_logs(container.id(), true), + }; + + WaitingStreamWrapper::new(log_stream) + .wait_for_message(self.message, self.times) + .await + .map_err(WaitContainerError::from)?; + + Ok(()) + } +} diff --git a/testcontainers/src/core/wait/mod.rs b/testcontainers/src/core/wait/mod.rs index 9d081ca8..603b17e9 100644 --- a/testcontainers/src/core/wait/mod.rs +++ b/testcontainers/src/core/wait/mod.rs @@ -1,17 +1,18 @@ use std::{env::var, fmt::Debug, time::Duration}; -use bytes::Bytes; pub use health_strategy::HealthWaitStrategy; pub use http_strategy::HttpWaitStrategy; +pub use log_strategy::LogWaitStrategy; use crate::{ - core::{client::Client, error::WaitContainerError, logs::WaitingStreamWrapper}, + core::{client::Client, logs::LogSource}, ContainerAsync, Image, }; pub(crate) mod cmd_wait; pub(crate) mod health_strategy; pub(crate) mod http_strategy; +pub(crate) mod log_strategy; pub(crate) trait WaitStrategy { async fn wait_until_ready( @@ -26,10 +27,8 @@ pub(crate) trait WaitStrategy { pub enum WaitFor { /// An empty condition. Useful for default cases or fallbacks. Nothing, - /// Wait for a message on the stdout stream of the container's logs. - StdOutMessage { message: Bytes }, - /// Wait for a message on the stderr stream of the container's logs. - StdErrMessage { message: Bytes }, + /// Wait for a certain message to appear in the container's logs. + Log(LogWaitStrategy), /// Wait for a certain amount of time. Duration { length: Duration }, /// Wait for the container's status to become `healthy`. @@ -41,16 +40,17 @@ pub enum WaitFor { impl WaitFor { /// Wait for the message to appear on the container's stdout. pub fn message_on_stdout(message: impl AsRef<[u8]>) -> WaitFor { - WaitFor::StdOutMessage { - message: Bytes::from(message.as_ref().to_vec()), - } + Self::log(LogWaitStrategy::new(LogSource::StdOut, message)) } /// Wait for the message to appear on the container's stderr. pub fn message_on_stderr(message: impl AsRef<[u8]>) -> WaitFor { - WaitFor::StdErrMessage { - message: Bytes::from(message.as_ref().to_vec()), - } + Self::log(LogWaitStrategy::new(LogSource::StdOut, message)) + } + + /// Wait for the message to appear on the container's stdout. + pub fn log(log_strategy: LogWaitStrategy) -> WaitFor { + WaitFor::Log(log_strategy) } /// Wait for the container to become healthy. @@ -114,18 +114,7 @@ impl WaitStrategy for WaitFor { container: &ContainerAsync, ) -> crate::core::error::Result<()> { match self { - WaitFor::StdOutMessage { message } => { - WaitingStreamWrapper::new(client.stdout_logs(container.id(), true)) - .wait_for_message(message) - .await - .map_err(WaitContainerError::from)? - } - WaitFor::StdErrMessage { message } => { - WaitingStreamWrapper::new(client.stderr_logs(container.id(), true)) - .wait_for_message(message) - .await - .map_err(WaitContainerError::from)? - } + WaitFor::Log(strategy) => strategy.wait_until_ready(client, container).await?, WaitFor::Duration { length } => { tokio::time::sleep(length).await; } @@ -135,8 +124,7 @@ impl WaitStrategy for WaitFor { WaitFor::Http(strategy) => { strategy.wait_until_ready(client, container).await?; } - // WaitFor::Nothing => {} - _ => {} + WaitFor::Nothing => {} } Ok(()) } diff --git a/testcontainers/tests/async_runner.rs b/testcontainers/tests/async_runner.rs index 3c52f782..9ff13b1f 100644 --- a/testcontainers/tests/async_runner.rs +++ b/testcontainers/tests/async_runner.rs @@ -5,7 +5,7 @@ use reqwest::StatusCode; use testcontainers::{ core::{ logs::{consumer::logging_consumer::LoggingConsumer, LogFrame}, - wait::HttpWaitStrategy, + wait::{HttpWaitStrategy, LogWaitStrategy}, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor, }, runners::AsyncRunner, @@ -98,7 +98,9 @@ async fn async_run_exec() -> anyhow::Result<()> { let _ = pretty_env_logger::try_init(); let image = GenericImage::new("simple_web_server", "latest") - .with_wait_for(WaitFor::message_on_stdout("server is ready")) + .with_wait_for(WaitFor::log( + LogWaitStrategy::stdout("server is ready").with_times(2), + )) .with_wait_for(WaitFor::seconds(1)); let container = image.start().await?; diff --git a/testcontainers/tests/sync_runner.rs b/testcontainers/tests/sync_runner.rs index b882b4fb..75b71d8b 100644 --- a/testcontainers/tests/sync_runner.rs +++ b/testcontainers/tests/sync_runner.rs @@ -4,7 +4,7 @@ use reqwest::StatusCode; use testcontainers::{ core::{ logs::{consumer::logging_consumer::LoggingConsumer, LogFrame}, - wait::HttpWaitStrategy, + wait::{HttpWaitStrategy, LogWaitStrategy}, CmdWaitFor, ExecCommand, Host, IntoContainerPort, WaitFor, }, runners::SyncRunner, @@ -150,7 +150,9 @@ fn sync_run_exec() -> anyhow::Result<()> { let _ = pretty_env_logger::try_init(); let image = GenericImage::new("simple_web_server", "latest") - .with_wait_for(WaitFor::message_on_stdout("server is ready")) + .with_wait_for(WaitFor::log( + LogWaitStrategy::stdout("server is ready").with_times(2), + )) .with_wait_for(WaitFor::seconds(1)); let container = image.start()?; diff --git a/testimages/src/bin/simple_web_server.rs b/testimages/src/bin/simple_web_server.rs index f004a433..7127e6c2 100644 --- a/testimages/src/bin/simple_web_server.rs +++ b/testimages/src/bin/simple_web_server.rs @@ -13,6 +13,7 @@ async fn main() { let addr = SocketAddr::from(([0, 0, 0, 0], 80)); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); println!("server is ready"); + println!("server is ready"); // duplicate line to test `times` parameter of `WaitFor::Log` axum::serve(listener, app.into_make_service()) .with_graceful_shutdown(shutdown_signal()) .await From 5650548a99b91c96e896b0db5fed95e0f2995424 Mon Sep 17 00:00:00 2001 From: Artem Medvedev Date: Sun, 7 Jul 2024 01:54:12 +0200 Subject: [PATCH 2/3] feat!: introduce `ExitWaitStrategy` Allows you to wait for the container to exit, optionally with a specific exit code. Closes #676 --- testcontainers/src/core/error.rs | 2 + testcontainers/src/core/wait/exit_strategy.rs | 76 +++++++++++++++++++ testcontainers/src/core/wait/mod.rs | 12 +++ testcontainers/tests/async_runner.rs | 7 +- 4 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 testcontainers/src/core/wait/exit_strategy.rs diff --git a/testcontainers/src/core/error.rs b/testcontainers/src/core/error.rs index 79340152..1957b53c 100644 --- a/testcontainers/src/core/error.rs +++ b/testcontainers/src/core/error.rs @@ -62,6 +62,8 @@ pub enum WaitContainerError { Unhealthy, #[error("container startup timeout")] StartupTimeout, + #[error("container exited with unexpected code: expected {expected}, actual {actual:?}")] + UnexpectedExitCode { expected: i64, actual: Option }, } impl TestcontainersError { diff --git a/testcontainers/src/core/wait/exit_strategy.rs b/testcontainers/src/core/wait/exit_strategy.rs new file mode 100644 index 00000000..117611d5 --- /dev/null +++ b/testcontainers/src/core/wait/exit_strategy.rs @@ -0,0 +1,76 @@ +use std::time::Duration; + +use crate::{ + core::{client::Client, error::WaitContainerError, wait::WaitStrategy}, + ContainerAsync, Image, +}; + +#[derive(Debug, Clone)] +pub struct ExitWaitStrategy { + expected_code: Option, + poll_interval: Duration, +} + +impl ExitWaitStrategy { + /// Create a new `ExitWaitStrategy` with default settings. + pub fn new() -> Self { + Self { + expected_code: None, + poll_interval: Duration::from_millis(100), + } + } + + /// Set the poll interval for checking the container's status. + pub fn with_poll_interval(mut self, poll_interval: Duration) -> Self { + self.poll_interval = poll_interval; + self + } + + /// Set the expected exit code of the container. + pub fn with_exit_code(mut self, expected_code: i64) -> Self { + self.expected_code = Some(expected_code); + self + } +} + +impl WaitStrategy for ExitWaitStrategy { + async fn wait_until_ready( + self, + client: &Client, + container: &ContainerAsync, + ) -> crate::core::error::Result<()> { + loop { + let container_state = client + .inspect(container.id()) + .await? + .state + .ok_or(WaitContainerError::StateUnavailable)?; + + let is_running = container_state.running.unwrap_or_default(); + + if is_running { + tokio::time::sleep(self.poll_interval).await; + continue; + } + + if let Some(expected_code) = self.expected_code { + let exit_code = container_state.exit_code; + if exit_code == Some(expected_code) { + return Err(WaitContainerError::UnexpectedExitCode { + expected: expected_code, + actual: exit_code, + } + .into()); + } + } + break; + } + Ok(()) + } +} + +impl Default for ExitWaitStrategy { + fn default() -> Self { + Self::new() + } +} diff --git a/testcontainers/src/core/wait/mod.rs b/testcontainers/src/core/wait/mod.rs index 603b17e9..be233235 100644 --- a/testcontainers/src/core/wait/mod.rs +++ b/testcontainers/src/core/wait/mod.rs @@ -1,5 +1,6 @@ use std::{env::var, fmt::Debug, time::Duration}; +pub use exit_strategy::ExitWaitStrategy; pub use health_strategy::HealthWaitStrategy; pub use http_strategy::HttpWaitStrategy; pub use log_strategy::LogWaitStrategy; @@ -10,6 +11,7 @@ use crate::{ }; pub(crate) mod cmd_wait; +pub(crate) mod exit_strategy; pub(crate) mod health_strategy; pub(crate) mod http_strategy; pub(crate) mod log_strategy; @@ -35,6 +37,8 @@ pub enum WaitFor { Healthcheck(HealthWaitStrategy), /// Wait for a certain HTTP response. Http(HttpWaitStrategy), + /// Wait for the container to exit. + Exit(ExitWaitStrategy), } impl WaitFor { @@ -66,6 +70,11 @@ impl WaitFor { WaitFor::Http(http_strategy) } + /// Wait for the container to exit. + pub fn exit(exit_strategy: ExitWaitStrategy) -> WaitFor { + WaitFor::Exit(exit_strategy) + } + /// Wait for a certain amount of seconds. /// /// Generally, it's not recommended to use this method, as it's better to wait for a specific condition to be met. @@ -124,6 +133,9 @@ impl WaitStrategy for WaitFor { WaitFor::Http(strategy) => { strategy.wait_until_ready(client, container).await?; } + WaitFor::Exit(strategy) => { + strategy.wait_until_ready(client, container).await?; + } WaitFor::Nothing => {} } Ok(()) diff --git a/testcontainers/tests/async_runner.rs b/testcontainers/tests/async_runner.rs index 9ff13b1f..d221e0ad 100644 --- a/testcontainers/tests/async_runner.rs +++ b/testcontainers/tests/async_runner.rs @@ -5,7 +5,7 @@ use reqwest::StatusCode; use testcontainers::{ core::{ logs::{consumer::logging_consumer::LoggingConsumer, LogFrame}, - wait::{HttpWaitStrategy, LogWaitStrategy}, + wait::{ExitWaitStrategy, HttpWaitStrategy, LogWaitStrategy}, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor, }, runners::AsyncRunner, @@ -26,7 +26,10 @@ impl Image for HelloWorld { } fn ready_conditions(&self) -> Vec { - vec![WaitFor::message_on_stdout("Hello from Docker!")] + vec![ + WaitFor::message_on_stdout("Hello from Docker!"), + WaitFor::exit(ExitWaitStrategy::new().with_exit_code(0)), + ] } } From 717203a4401b765d9e872f7e2847b100f5781d04 Mon Sep 17 00:00:00 2001 From: Artem Medvedev Date: Sun, 7 Jul 2024 02:12:17 +0200 Subject: [PATCH 3/3] fix: condition --- testcontainers/src/core/wait/exit_strategy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testcontainers/src/core/wait/exit_strategy.rs b/testcontainers/src/core/wait/exit_strategy.rs index 117611d5..eed8f3c8 100644 --- a/testcontainers/src/core/wait/exit_strategy.rs +++ b/testcontainers/src/core/wait/exit_strategy.rs @@ -55,7 +55,7 @@ impl WaitStrategy for ExitWaitStrategy { if let Some(expected_code) = self.expected_code { let exit_code = container_state.exit_code; - if exit_code == Some(expected_code) { + if exit_code != Some(expected_code) { return Err(WaitContainerError::UnexpectedExitCode { expected: expected_code, actual: exit_code,