diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs index 9e038dd8..3b497219 100644 --- a/testcontainers/src/core/containers/async_container.rs +++ b/testcontainers/src/core/containers/async_container.rs @@ -53,17 +53,36 @@ where pub(crate) async fn new( id: String, docker_client: Arc, - image: ContainerRequest, + mut container_req: ContainerRequest, network: Option>, ) -> Result> { + let log_consumers = std::mem::take(&mut container_req.log_consumers); let container = ContainerAsync { id, - image, + image: container_req, docker_client, network, dropped: false, }; + let mut logs = container.docker_client.logs(&container.id, true); + let container_id = container.id.clone(); + tokio::spawn(async move { + while let Some(result) = logs.next().await { + match result { + Ok(record) => { + for consumer in &log_consumers { + consumer.accept(&record).await; + tokio::task::yield_now().await; + } + } + Err(err) => { + log::error!("Failed to read log frame for container {container_id}: {err}",); + } + } + } + }); + let ready_conditions = container.image().ready_conditions(); container.block_until_ready(ready_conditions).await?; Ok(container) diff --git a/testcontainers/src/core/containers/request.rs b/testcontainers/src/core/containers/request.rs index 8bf462bd..ddfa1a69 100644 --- a/testcontainers/src/core/containers/request.rs +++ b/testcontainers/src/core/containers/request.rs @@ -1,13 +1,21 @@ -use std::{borrow::Cow, collections::BTreeMap, net::IpAddr, time::Duration}; +use std::{ + borrow::Cow, + collections::BTreeMap, + fmt::{Debug, Formatter}, + net::IpAddr, + time::Duration, +}; use crate::{ - core::{mounts::Mount, ports::ContainerPort, ContainerState, ExecCommand, WaitFor}, + core::{ + logs::consumer::LogConsumer, mounts::Mount, ports::ContainerPort, ContainerState, + ExecCommand, WaitFor, + }, Image, TestcontainersError, }; /// Represents a request to start a container, allowing customization of the container. #[must_use] -#[derive(Debug)] pub struct ContainerRequest { pub(crate) image: I, pub(crate) overridden_cmd: Vec, @@ -24,6 +32,7 @@ pub struct ContainerRequest { pub(crate) cgroupns_mode: Option, pub(crate) userns_mode: Option, pub(crate) startup_timeout: Option, + pub(crate) log_consumers: Vec>, } /// Represents a port mapping between a host's external port and the internal port of a container. @@ -164,6 +173,7 @@ impl From for ContainerRequest { cgroupns_mode: None, userns_mode: None, startup_timeout: None, + log_consumers: vec![], } } } @@ -184,3 +194,25 @@ impl PortMapping { self.container_port } } + +impl Debug for ContainerRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ContainerRequest") + .field("image", &self.image) + .field("overridden_cmd", &self.overridden_cmd) + .field("image_name", &self.image_name) + .field("image_tag", &self.image_tag) + .field("container_name", &self.container_name) + .field("network", &self.network) + .field("env_vars", &self.env_vars) + .field("hosts", &self.hosts) + .field("mounts", &self.mounts) + .field("ports", &self.ports) + .field("privileged", &self.privileged) + .field("shm_size", &self.shm_size) + .field("cgroupns_mode", &self.cgroupns_mode) + .field("userns_mode", &self.userns_mode) + .field("startup_timeout", &self.startup_timeout) + .finish() + } +} diff --git a/testcontainers/src/core/image/image_ext.rs b/testcontainers/src/core/image/image_ext.rs index 0a7d8fc9..54cc5e8d 100644 --- a/testcontainers/src/core/image/image_ext.rs +++ b/testcontainers/src/core/image/image_ext.rs @@ -1,7 +1,7 @@ use std::time::Duration; use crate::{ - core::{CgroupnsMode, ContainerPort, Host, Mount, PortMapping}, + core::{logs::consumer::LogConsumer, CgroupnsMode, ContainerPort, Host, Mount, PortMapping}, ContainerRequest, Image, }; @@ -81,6 +81,11 @@ pub trait ImageExt { /// Sets the startup timeout for the container. The default is 60 seconds. fn with_startup_timeout(self, timeout: Duration) -> ContainerRequest; + + /// Adds the log consumer to the container. + /// + /// Allows to follow the container logs for the whole lifecycle of the container, starting from the creation. + fn with_log_consumer(self, log_consumer: impl LogConsumer + 'static) -> ContainerRequest; } /// Implements the [`ImageExt`] trait for the every type that can be converted into a [`ContainerRequest`]. @@ -202,4 +207,10 @@ impl>, I: Image> ImageExt for RI { ..container_req } } + + fn with_log_consumer(self, log_consumer: impl LogConsumer + 'static) -> ContainerRequest { + let mut container_req = self.into(); + container_req.log_consumers.push(Box::new(log_consumer)); + container_req + } } diff --git a/testcontainers/src/core/logs.rs b/testcontainers/src/core/logs.rs index 1f020906..611f7221 100644 --- a/testcontainers/src/core/logs.rs +++ b/testcontainers/src/core/logs.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use futures::{stream::BoxStream, StreamExt}; use memchr::memmem::Finder; +pub mod consumer; pub(crate) mod stream; #[derive(Debug, Clone)] @@ -40,6 +41,15 @@ impl LogSource { } } +impl LogFrame { + pub fn bytes(&self) -> &Bytes { + match self { + LogFrame::StdOut(bytes) => bytes, + LogFrame::StdErr(bytes) => bytes, + } + } +} + // TODO: extract caching functionality to a separate wrapper pub(crate) struct WaitingStreamWrapper { inner: BoxStream<'static, Result>, diff --git a/testcontainers/src/core/logs/consumer.rs b/testcontainers/src/core/logs/consumer.rs new file mode 100644 index 00000000..8bfe3ac7 --- /dev/null +++ b/testcontainers/src/core/logs/consumer.rs @@ -0,0 +1,22 @@ +use futures::{future::BoxFuture, FutureExt}; + +use crate::core::logs::LogFrame; + +/// Log consumer is a trait that allows to consume log frames. +/// Consumers will be called for each log frame that is produced by the container for the whole lifecycle of the container. +pub trait LogConsumer: Send + Sync { + fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()>; +} + +impl LogConsumer for F +where + F: Fn(&LogFrame) + Send + Sync, +{ + fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()> { + // preferably to spawn blocking task + async move { + self(record); + } + .boxed() + } +} diff --git a/testcontainers/tests/async_runner.rs b/testcontainers/tests/async_runner.rs index dd6a09b5..151d4e80 100644 --- a/testcontainers/tests/async_runner.rs +++ b/testcontainers/tests/async_runner.rs @@ -3,7 +3,9 @@ use std::time::Duration; use bollard::Docker; use reqwest::StatusCode; use testcontainers::{ - core::{wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor}, + core::{ + logs::LogFrame, wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor, + }, runners::AsyncRunner, GenericImage, *, }; @@ -166,3 +168,21 @@ async fn async_run_exec_fails_due_to_unexpected_code() -> anyhow::Result<()> { assert!(res.is_err()); Ok(()) } + +#[tokio::test] +async fn async_run_with_log_consumer() -> anyhow::Result<()> { + let _ = pretty_env_logger::try_init(); + + let (tx, rx) = std::sync::mpsc::sync_channel(1); + let _container = HelloWorld + .with_log_consumer(move |frame: &LogFrame| { + // notify when the expected message is found + if String::from_utf8_lossy(frame.bytes()) == "Hello from Docker!\n" { + let _ = tx.send(()); + } + }) + .start() + .await?; + rx.recv()?; // notification from consumer + Ok(()) +}