From 55037f2e39e18e6baacd9742aafa0e9d6949bc24 Mon Sep 17 00:00:00 2001 From: "Wang, Arron" Date: Wed, 19 Oct 2022 14:17:35 +0800 Subject: [PATCH] Add async_read support for pull_blob API This API allows blobs input stream integrate seamless with following blob processing using Rust's asynchronous IO types like async_compression. Fixed: #53 Signed-off-by: Wang, Arron --- Cargo.toml | 2 ++ src/client.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a2ccdd7a..527703c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ trust-dns = ["reqwest/trust-dns"] test-registry = [] [dependencies] +futures = "0.3" futures-util = "0.3" http = "0.2" http-auth = { version = "0.1", default_features = false } @@ -47,6 +48,7 @@ serde = { version = "1.0", features = ["derive"] } sha2 = "0.10" thiserror = "1.0" tokio = { version = "1.21", features = ["macros", "fs"] } +tokio-util = { version = "0.7.4", features = ["compat"] } tracing = { version = "0.1", features = ['log'] } unicase = "2.6" diff --git a/src/client.rs b/src/client.rs index 90477ac1..9196c8e9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -17,6 +17,7 @@ use crate::Reference; use crate::errors::{OciDistributionError, Result}; use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache}; +use futures::stream::TryStreamExt; use futures_util::future; use futures_util::stream::StreamExt; use http::HeaderValue; @@ -28,7 +29,8 @@ use serde::Serialize; use sha2::Digest; use std::collections::HashMap; use std::convert::TryFrom; -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::{debug, trace, warn}; const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[ @@ -775,6 +777,28 @@ impl Client { Ok(()) } + /// Stream a single layer from an OCI registry. + /// + /// This is a streaming version of [`Client::pull_blob`]. + /// Returns [`AsyncRead`](tokio::io::AsyncRead). + pub async fn async_pull_blob( + &self, + image: &Reference, + digest: &str, + ) -> Result { + let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), digest); + let stream = RequestBuilderWrapper::from_client(self, |client| client.get(&url)) + .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? + .apply_auth(image, RegistryOperation::Pull)? + .into_request_builder() + .send() + .await? + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + + Ok(FuturesAsyncReadCompatExt::compat(stream.into_async_read())) + } + /// Begins a session to push an image to registry in a monolithical way /// /// Returns URL with session UUID @@ -1986,6 +2010,41 @@ mod test { } } + #[tokio::test] + async fn test_async_pull_blob() { + let mut c = Client::default(); + + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + c.auth( + &reference, + &RegistryAuth::Anonymous, + RegistryOperation::Pull, + ) + .await + .expect("authenticated"); + let (manifest, _) = c + ._pull_image_manifest(&reference) + .await + .expect("failed to pull manifest"); + + // Pull one specific layer + let mut file: Vec = Vec::new(); + let layer0 = &manifest.layers[0]; + + let mut async_reader = c + .async_pull_blob(&reference, &layer0.digest) + .await + .expect("failed to pull blob with async read"); + tokio::io::AsyncReadExt::read_to_end(&mut async_reader, &mut file) + .await + .unwrap(); + + // The manifest says how many bytes we should expect. + assert_eq!(file.len(), layer0.size as usize); + } + } + #[tokio::test] async fn test_pull() { for &image in TEST_IMAGES {