Skip to content

Commit

Permalink
Add async_read support for pull_blob API
Browse files Browse the repository at this point in the history
This API allows blobs input stream integrate seamless with
following blob processing using Rust's asynchronous IO types
like async_compression.

Fixed: #53

Signed-off-by: Wang, Arron <arron.wang@intel.com>
  • Loading branch information
arronwy committed Dec 10, 2022
1 parent fccded4 commit 55037f2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ trust-dns = ["reqwest/trust-dns"]
test-registry = []

[dependencies]
futures = "0.3"
futures-util = "0.3"
http = "0.2"
http-auth = { version = "0.1", default_features = false }
Expand All @@ -47,6 +48,7 @@ serde = { version = "1.0", features = ["derive"] }
sha2 = "0.10"
thiserror = "1.0"
tokio = { version = "1.21", features = ["macros", "fs"] }
tokio-util = { version = "0.7.4", features = ["compat"] }
tracing = { version = "0.1", features = ['log'] }
unicase = "2.6"

Expand Down
61 changes: 60 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::Reference;

use crate::errors::{OciDistributionError, Result};
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use futures::stream::TryStreamExt;
use futures_util::future;
use futures_util::stream::StreamExt;
use http::HeaderValue;
Expand All @@ -28,7 +29,8 @@ use serde::Serialize;
use sha2::Digest;
use std::collections::HashMap;
use std::convert::TryFrom;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, trace, warn};

const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
Expand Down Expand Up @@ -775,6 +777,28 @@ impl Client {
Ok(())
}

/// Stream a single layer from an OCI registry.
///
/// This is a streaming version of [`Client::pull_blob`].
/// Returns [`AsyncRead`](tokio::io::AsyncRead).
pub async fn async_pull_blob(
&self,
image: &Reference,
digest: &str,
) -> Result<impl AsyncRead + Unpin> {
let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), digest);
let stream = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.apply_auth(image, RegistryOperation::Pull)?
.into_request_builder()
.send()
.await?
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));

Ok(FuturesAsyncReadCompatExt::compat(stream.into_async_read()))
}

/// Begins a session to push an image to registry in a monolithical way
///
/// Returns URL with session UUID
Expand Down Expand Up @@ -1986,6 +2010,41 @@ mod test {
}
}

#[tokio::test]
async fn test_async_pull_blob() {
let mut c = Client::default();

for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let (manifest, _) = c
._pull_image_manifest(&reference)
.await
.expect("failed to pull manifest");

// Pull one specific layer
let mut file: Vec<u8> = Vec::new();
let layer0 = &manifest.layers[0];

let mut async_reader = c
.async_pull_blob(&reference, &layer0.digest)
.await
.expect("failed to pull blob with async read");
tokio::io::AsyncReadExt::read_to_end(&mut async_reader, &mut file)
.await
.unwrap();

// The manifest says how many bytes we should expect.
assert_eq!(file.len(), layer0.size as usize);
}
}

#[tokio::test]
async fn test_pull() {
for &image in TEST_IMAGES {
Expand Down

0 comments on commit 55037f2

Please sign in to comment.