Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce log consumers #681

Merged
merged 9 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions testcontainers/src/core/containers/async_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,36 @@ where
pub(crate) async fn new(
id: String,
docker_client: Arc<Client>,
image: ContainerRequest<I>,
mut container_req: ContainerRequest<I>,
network: Option<Arc<Network>>,
) -> Result<ContainerAsync<I>> {
let log_consumers = std::mem::take(&mut container_req.log_consumers);
let container = ContainerAsync {
id,
image,
image: container_req,
docker_client,
network,
dropped: false,
};

let mut logs = container.docker_client.logs(&container.id, true);
let container_id = container.id.clone();
tokio::spawn(async move {
while let Some(result) = logs.next().await {
match result {
Ok(record) => {
for consumer in &log_consumers {
consumer.accept(&record).await;
tokio::task::yield_now().await;
}
}
Err(err) => {
log::error!("Failed to read log frame for container {container_id}: {err}",);
}
}
}
});

let ready_conditions = container.image().ready_conditions();
container.block_until_ready(ready_conditions).await?;
Ok(container)
Expand Down
38 changes: 35 additions & 3 deletions testcontainers/src/core/containers/request.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use std::{borrow::Cow, collections::BTreeMap, net::IpAddr, time::Duration};
use std::{
borrow::Cow,
collections::BTreeMap,
fmt::{Debug, Formatter},
net::IpAddr,
time::Duration,
};

use crate::{
core::{mounts::Mount, ports::ContainerPort, ContainerState, ExecCommand, WaitFor},
core::{
logs::consumer::LogConsumer, mounts::Mount, ports::ContainerPort, ContainerState,
ExecCommand, WaitFor,
},
Image, TestcontainersError,
};

/// Represents a request to start a container, allowing customization of the container.
#[must_use]
#[derive(Debug)]
pub struct ContainerRequest<I: Image> {
pub(crate) image: I,
pub(crate) overridden_cmd: Vec<String>,
Expand All @@ -24,6 +32,7 @@ pub struct ContainerRequest<I: Image> {
pub(crate) cgroupns_mode: Option<CgroupnsMode>,
pub(crate) userns_mode: Option<String>,
pub(crate) startup_timeout: Option<Duration>,
pub(crate) log_consumers: Vec<Box<dyn LogConsumer + 'static>>,
}

/// Represents a port mapping between a host's external port and the internal port of a container.
Expand Down Expand Up @@ -164,6 +173,7 @@ impl<I: Image> From<I> for ContainerRequest<I> {
cgroupns_mode: None,
userns_mode: None,
startup_timeout: None,
log_consumers: vec![],
}
}
}
Expand All @@ -184,3 +194,25 @@ impl PortMapping {
self.container_port
}
}

impl<I: Image + Debug> Debug for ContainerRequest<I> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ContainerRequest")
.field("image", &self.image)
.field("overridden_cmd", &self.overridden_cmd)
.field("image_name", &self.image_name)
.field("image_tag", &self.image_tag)
.field("container_name", &self.container_name)
.field("network", &self.network)
.field("env_vars", &self.env_vars)
.field("hosts", &self.hosts)
.field("mounts", &self.mounts)
.field("ports", &self.ports)
.field("privileged", &self.privileged)
.field("shm_size", &self.shm_size)
.field("cgroupns_mode", &self.cgroupns_mode)
.field("userns_mode", &self.userns_mode)
.field("startup_timeout", &self.startup_timeout)
.finish()
}
}
13 changes: 12 additions & 1 deletion testcontainers/src/core/image/image_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use crate::{
core::{CgroupnsMode, ContainerPort, Host, Mount, PortMapping},
core::{logs::consumer::LogConsumer, CgroupnsMode, ContainerPort, Host, Mount, PortMapping},
ContainerRequest, Image,
};

Expand Down Expand Up @@ -70,7 +70,7 @@
/// cgroup namespace mode for the container. Possible values are:
/// - [`CgroupnsMode::Private`]: the container runs in its own private cgroup namespace
/// - [`CgroupnsMode::Host`]: use the host system's cgroup namespace
/// If not specified, the daemon default is used, which can either be `\"private\"` or `\"host\"`, depending on daemon version, kernel support and configuration.

Check warning on line 73 in testcontainers/src/core/image/image_ext.rs

View workflow job for this annotation

GitHub Actions / clippy

doc list item missing indentation

warning: doc list item missing indentation --> testcontainers/src/core/image/image_ext.rs:73:9 | 73 | /// If not specified, the daemon default is used, which can either be `\"private\"` or `\"host\"`, depending on daemon version, kerne... | ^ | = help: if this is supposed to be its own paragraph, add a blank line = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_lazy_continuation = note: `#[warn(clippy::doc_lazy_continuation)]` on by default help: indent this line | 73 | /// If not specified, the daemon default is used, which can either be `\"private\"` or `\"host\"`, depending on daemon version, kernel support and configuration. | ++
fn with_cgroupns_mode(self, cgroupns_mode: CgroupnsMode) -> ContainerRequest<I>;

/// Sets the usernamespace mode for the container when usernamespace remapping option is enabled.
Expand All @@ -81,6 +81,11 @@

/// Sets the startup timeout for the container. The default is 60 seconds.
fn with_startup_timeout(self, timeout: Duration) -> ContainerRequest<I>;

/// Adds the log consumer to the container.
///
/// Allows to follow the container logs for the whole lifecycle of the container, starting from the creation.
fn with_log_consumer(self, log_consumer: impl LogConsumer + 'static) -> ContainerRequest<I>;
}

/// Implements the [`ImageExt`] trait for the every type that can be converted into a [`ContainerRequest`].
Expand Down Expand Up @@ -202,4 +207,10 @@
..container_req
}
}

fn with_log_consumer(self, log_consumer: impl LogConsumer + 'static) -> ContainerRequest<I> {
let mut container_req = self.into();
container_req.log_consumers.push(Box::new(log_consumer));
container_req
}
}
10 changes: 10 additions & 0 deletions testcontainers/src/core/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
use memchr::memmem::Finder;

pub mod consumer;
pub(crate) mod stream;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -40,6 +41,15 @@ impl LogSource {
}
}

impl LogFrame {
pub fn bytes(&self) -> &Bytes {
match self {
LogFrame::StdOut(bytes) => bytes,
LogFrame::StdErr(bytes) => bytes,
}
}
}

// TODO: extract caching functionality to a separate wrapper
pub(crate) struct WaitingStreamWrapper {
inner: BoxStream<'static, Result<Bytes, io::Error>>,
Expand Down
22 changes: 22 additions & 0 deletions testcontainers/src/core/logs/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use futures::{future::BoxFuture, FutureExt};

use crate::core::logs::LogFrame;

/// Log consumer is a trait that allows to consume log frames.
/// Consumers will be called for each log frame that is produced by the container for the whole lifecycle of the container.
pub trait LogConsumer: Send + Sync {
fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()>;
}

impl<F> LogConsumer for F
where
F: Fn(&LogFrame) + Send + Sync,
{
fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()> {
// preferably to spawn blocking task
async move {
self(record);
}
.boxed()
}
}
22 changes: 21 additions & 1 deletion testcontainers/tests/async_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;
use bollard::Docker;
use reqwest::StatusCode;
use testcontainers::{
core::{wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor},
core::{
logs::LogFrame, wait::HttpWaitStrategy, CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor,
},
runners::AsyncRunner,
GenericImage, *,
};
Expand Down Expand Up @@ -166,3 +168,21 @@ async fn async_run_exec_fails_due_to_unexpected_code() -> anyhow::Result<()> {
assert!(res.is_err());
Ok(())
}

#[tokio::test]
async fn async_run_with_log_consumer() -> anyhow::Result<()> {
let _ = pretty_env_logger::try_init();

let (tx, rx) = std::sync::mpsc::sync_channel(1);
let _container = HelloWorld
.with_log_consumer(move |frame: &LogFrame| {
// notify when the expected message is found
if String::from_utf8_lossy(frame.bytes()) == "Hello from Docker!\n" {
let _ = tx.send(());
}
})
.start()
.await?;
rx.recv()?; // notification from consumer
Ok(())
}
Loading