diff --git a/lib/Cargo.toml b/lib/Cargo.toml index ef6f5e42..7edc50d1 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -12,7 +12,6 @@ rust-version = "1.74.0" [dependencies] anyhow = "1.0" containers-image-proxy = "0.5.5" -async-compression = { version = "0.4", features = ["gzip", "tokio", "zstd"] } camino = "1.0.4" chrono = "0.4.19" olpc-cjson = "0.1.1" @@ -43,6 +42,7 @@ tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1 tokio-util = { features = ["io-util"], version = "0.7" } tokio-stream = { features = ["sync"], version = "0.1.8" } tracing = "0.1" +zstd = "0.13.1" indoc = { version = "2", optional = true } xshell = { version = "0.2", optional = true } diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index b6efb8d8..99f9f9e4 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -36,7 +36,7 @@ use crate::container::store::LayerProgress; use super::*; use containers_image_proxy::{ImageProxy, OpenedImage}; use fn_error_context::context; -use futures_util::{Future, FutureExt}; +use futures_util::{Future, FutureExt, TryFutureExt as _}; use oci_spec::image as oci_image; use std::sync::{Arc, Mutex}; use tokio::{ @@ -189,22 +189,76 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) - importer.unencapsulate().await } +/// Take an async AsyncBufRead and handle decompression for it, returning +/// a wrapped AsyncBufRead implementation. +/// This is implemented with a background thread using a pipe-to-self, +/// and so there is an additional Future object returned that is a "driver" +/// task and must also be checked for errors. +pub(crate) fn decompress_bridge<'a>( + src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static, + is_zstd: bool, +) -> Result<( + // This one is the input reader + impl tokio::io::AsyncBufRead + Send + Unpin + 'static, + // And this represents the worker thread doing copying + impl Future> + Send + Unpin + 'static, +)> { + // We use a plain unix pipe() because it's just a very convenient + // way to bridge arbitrarily between sync and async with a worker + // thread. Yes, it involves going through the kernel, but + // eventually we'll replace all this logic with podman anyways. + let (tx, rx) = tokio::net::unix::pipe::pipe()?; + let task = tokio::task::spawn_blocking(move || -> Result<()> { + // Convert the write half of the pipe() into a regular blocking file descriptor + let tx = tx.into_blocking_fd()?; + let mut tx = std::fs::File::from(tx); + // Convert the async input back to synchronous. + let src = tokio_util::io::SyncIoBridge::new(src); + let bufr = std::io::BufReader::new(src); + // Wrap the input in a decompressor; I originally tried to make + // this function take a function pointer, but yeah that was painful + // with the type system. + let mut src: Box = if is_zstd { + Box::new(zstd::stream::read::Decoder::new(bufr)?) + } else { + Box::new(flate2::bufread::GzDecoder::new(bufr)) + }; + // We don't care about the number of bytes copied + let _n: u64 = std::io::copy(&mut src, &mut tx)?; + Ok(()) + }) + // Flatten the nested Result> + .map(crate::tokio_util::flatten_anyhow); + // And return the pair of futures + Ok((tokio::io::BufReader::new(rx), task)) +} + /// Create a decompressor for this MIME type, given a stream of input. fn new_async_decompressor<'a>( media_type: &oci_image::MediaType, - src: impl AsyncBufRead + Send + Unpin + 'a, -) -> Result> { - match media_type { - oci_image::MediaType::ImageLayerGzip => Ok(Box::new(tokio::io::BufReader::new( - async_compression::tokio::bufread::GzipDecoder::new(src), - ))), - oci_image::MediaType::ImageLayerZstd => Ok(Box::new(tokio::io::BufReader::new( - async_compression::tokio::bufread::ZstdDecoder::new(src), - ))), - oci_image::MediaType::ImageLayer => Ok(Box::new(src)), - oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Ok(Box::new(src)), - o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)), - } + src: impl AsyncBufRead + Send + Unpin + 'static, +) -> Result<( + Box, + impl Future> + Send + Unpin + 'static, +)> { + let r: ( + Box, + Box> + Send + Unpin + 'static>, + ) = match media_type { + m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => { + let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd); + let (r, driver) = decompress_bridge(src, is_zstd)?; + (Box::new(r), Box::new(driver) as _) + } + oci_image::MediaType::ImageLayer => { + (Box::new(src), Box::new(futures_util::future::ready(Ok(())))) + } + oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => { + (Box::new(src), Box::new(futures_util::future::ready(Ok(())))) + } + o => anyhow::bail!("Unhandled layer type: {}", o), + }; + Ok(r) } /// A wrapper for [`get_blob`] which fetches a layer and decompresses it. @@ -262,11 +316,13 @@ pub(crate) async fn fetch_layer_decompress<'a>( progress.send_replace(Some(status)); } }; - let reader = new_async_decompressor(media_type, readprogress)?; + let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?; + let driver = driver.and_then(|()| compression_driver); let driver = futures_util::future::join(readproxy, driver).map(|r| r.1); Ok((reader, Either::Left(driver))) } else { - let blob = new_async_decompressor(media_type, blob)?; + let (blob, compression_driver) = new_async_decompressor(media_type, blob)?; + let driver = driver.and_then(|()| compression_driver); Ok((blob, Either::Right(driver))) } } diff --git a/lib/tests/it/main.rs b/lib/tests/it/main.rs index 464cab36..835119d5 100644 --- a/lib/tests/it/main.rs +++ b/lib/tests/it/main.rs @@ -401,10 +401,11 @@ async fn test_tar_write() -> Result<()> { #[tokio::test] async fn test_tar_write_tar_layer() -> Result<()> { let fixture = Fixture::new_v1()?; - let uncompressed_tar = tokio::io::BufReader::new( - async_compression::tokio::bufread::GzipDecoder::new(EXAMPLE_TAR_LAYER), - ); - ostree_ext::tar::write_tar(fixture.destrepo(), uncompressed_tar, "test", None).await?; + let mut v = Vec::new(); + let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER)); + let _n = std::io::copy(&mut dec, &mut v)?; + let r = tokio::io::BufReader::new(std::io::Cursor::new(v)); + ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?; Ok(()) }