Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async_read support for pull_blob API #54

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ trust-dns = ["reqwest/trust-dns"]
test-registry = []

[dependencies]
futures = "0.3"
futures-util = "0.3"
http = "0.2"
http-auth = { version = "0.1", default_features = false }
Expand All @@ -47,6 +48,7 @@ serde = { version = "1.0", features = ["derive"] }
sha2 = "0.10"
thiserror = "1.0"
tokio = { version = "1.21", features = ["macros", "fs"] }
tokio-util = { version = "0.7.4", features = ["compat"] }
tracing = { version = "0.1", features = ['log'] }
unicase = "2.6"

Expand Down
61 changes: 60 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::Reference;

use crate::errors::{OciDistributionError, Result};
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use futures::stream::TryStreamExt;
use futures_util::future;
use futures_util::stream::StreamExt;
use http::HeaderValue;
Expand All @@ -28,7 +29,8 @@ use serde::Serialize;
use sha2::Digest;
use std::collections::HashMap;
use std::convert::TryFrom;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, trace, warn};

const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[
Expand Down Expand Up @@ -775,6 +777,28 @@ impl Client {
Ok(())
}

/// Stream a single layer from an OCI registry.
///
/// This is a streaming version of [`Client::pull_blob`].
/// Returns [`AsyncRead`](tokio::io::AsyncRead).
pub async fn async_pull_blob(
&self,
image: &Reference,
digest: &str,
) -> Result<impl AsyncRead + Unpin> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is already merged but @arronwy is there a reason you chose AsyncRead over returning Stream? Generally it is better to return a stream and let the user choose how they want to consume it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @thomastaylor312 , yes, agree, Stream can easy convert to AsyncRead I also verified if we return
Result<impl Stream<Item = std::result::Result<bytes::Bytes, std::io::Error>>>
will also works.

The reason I chose AsyncRead is try to consistent with current pull_blob API with output using:
AsyncWrite and async_compression crates: https://docs.rs/async-compression/latest/async_compression/index.html also operate over AsyncWrite and AsyncBufRead streams.

It's ok for me if we decide change to return Stream to let user to choose :-).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I had forgotten we used AsyncRead. I'll capture an issue to refactor that at some point

let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), digest);
let stream = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
.apply_auth(image, RegistryOperation::Pull)?
.into_request_builder()
.send()
.await?
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));

Ok(FuturesAsyncReadCompatExt::compat(stream.into_async_read()))
}

/// Begins a session to push an image to registry in a monolithical way
///
/// Returns URL with session UUID
Expand Down Expand Up @@ -1986,6 +2010,41 @@ mod test {
}
}

#[tokio::test]
async fn test_async_pull_blob() {
let mut c = Client::default();

for &image in TEST_IMAGES {
let reference = Reference::try_from(image).expect("failed to parse reference");
c.auth(
&reference,
&RegistryAuth::Anonymous,
RegistryOperation::Pull,
)
.await
.expect("authenticated");
let (manifest, _) = c
._pull_image_manifest(&reference)
.await
.expect("failed to pull manifest");

// Pull one specific layer
let mut file: Vec<u8> = Vec::new();
let layer0 = &manifest.layers[0];

let mut async_reader = c
.async_pull_blob(&reference, &layer0.digest)
.await
.expect("failed to pull blob with async read");
tokio::io::AsyncReadExt::read_to_end(&mut async_reader, &mut file)
.await
.unwrap();

// The manifest says how many bytes we should expect.
assert_eq!(file.len(), layer0.size as usize);
}
}

#[tokio::test]
async fn test_pull() {
for &image in TEST_IMAGES {
Expand Down