Skip to content

Commit

Permalink
Merge pull request #49 from imageworks/44-async
Browse files Browse the repository at this point in the history
Convert to an Async API
  • Loading branch information
rydrman authored Mar 3, 2022
2 parents 2ce4478 + 3f54b75 commit 9ff40f5
Show file tree
Hide file tree
Showing 83 changed files with 2,410 additions and 1,505 deletions.
233 changes: 141 additions & 92 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 17 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ name = "spfs-render"
path = "src/cli/cmd_render.rs"

[dependencies]
async-trait = "0.1.52"
async-recursion = "1.0"
caps = "0.5.3"
chrono = "0.4.19"
colored = "2.0.0"
Expand All @@ -49,35 +51,44 @@ faccess = "0.2.3"
futures = "0.3.9"
gitignore = "1.0.7"
glob = "0.3.0"
indicatif = {version = "0.15.0", features = ["rayon"]}
indicatif = "0.16.2"
itertools = "0.9.0"
libc = "0.2.80"
nix = "0.19.0"
palaver = "0.2.8"
rand = "0.7.3"
rayon = "1.5.0"
relative-path = "1.3.2"
ring = "0.16.15"
rstest = "0.6.4"
semver = "0.11.0"
sentry = "0.21.0"
serde = {version = "1.0.117", features = ["derive"]}
serde = { version = "1.0.117", features = ["derive"] }
serde_derive = "1.0.118"
serde_json = "1.0.57"
structopt = "0.3.21"
strum = "0.21"
strum_macros = "0.21"
tar = "0.4.30"
tempdir = "0.3.7"
tokio = { version = "1.15", features = [
"fs",
"io-util",
"io-std",
"rt",
"rt-multi-thread",
"macros",
"sync",
"process",
] }
tokio-stream = "0.1.8"
tracing = "0.1.22"
tracing-subscriber = "0.2.15"
unicode_reader = "1.0.1"
url = "2.2.0"
uuid = {version = "0.8.1", features = ["v4"]}
uuid = { version = "0.8.1", features = ["v4"] }
walkdir = "2.3.1"
whoami = "0.9.0"
thiserror = "1.0"

[dev-dependencies]
rstest = "0.6.4"
rstest = { version = "0.12", default_features = false }
serial_test = "0.5.1"
4 changes: 2 additions & 2 deletions spfs.spec
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ done
/usr/bin/spfs-push
/usr/bin/spfs-pull
/usr/bin/spfs-init
%caps(cap_chown,cap_fowner+p) /usr/bin/spfs-render
%caps(cap_chown,cap_fowner+ep) /usr/bin/spfs-render
%caps(cap_sys_chroot,cap_sys_admin+ep) /usr/bin/spfs-join
%caps(cap_setuid,cap_chown,cap_mknod,cap_sys_admin,cap_fowner+p) /usr/bin/spfs-enter
%caps(cap_setuid,cap_chown,cap_mknod,cap_sys_admin,cap_fowner+ep) /usr/bin/spfs-enter

%post
mkdir -p /spfs
Expand Down
4 changes: 2 additions & 2 deletions src/bootstrap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn test_shell_initialization_startup_scripts(
startup_cmd: &str,
tmpdir: tempdir::TempDir,
) {
let _guard = init_logging();
init_logging();
let shell_path = match which(shell) {
Some(path) => path,
None => {
Expand Down Expand Up @@ -124,7 +124,7 @@ fn test_shell_initialization_no_startup_scripts(shell: &str, tmpdir: tempdir::Te
#[rstest(shell, case("bash"), case("tcsh"))]
#[serial_test::serial] // env manipulation must be reliable
fn test_find_alternate_bash(shell: &str, tmpdir: tempdir::TempDir) {
let _guard = init_logging();
init_logging();
let original_path = std::env::var("PATH").unwrap_or_default();
let original_shell = std::env::var("SHELL").unwrap_or_default();
std::env::set_var("PATH", tmpdir.path());
Expand Down
157 changes: 87 additions & 70 deletions src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

use std::collections::HashSet;

use indicatif::ParallelProgressIterator;
use rayon::prelude::*;
use tokio_stream::StreamExt;

use crate::{encoding, storage, Error, Result};

Expand All @@ -14,14 +13,14 @@ use crate::{encoding, storage, Error, Result};
mod clean_test;

/// Clean all untagged objects from the given repo.
pub fn clean_untagged_objects(repo: &storage::RepositoryHandle) -> Result<()> {
let unattached = get_all_unattached_objects(repo)?;
pub async fn clean_untagged_objects(repo: &storage::RepositoryHandle) -> Result<()> {
let unattached = get_all_unattached_objects(repo).await?;
if unattached.is_empty() {
tracing::info!("nothing to clean!");
} else {
tracing::info!("removing orphaned data");
let count = unattached.len();
purge_objects(&unattached.iter().collect::<Vec<_>>(), repo)?;
purge_objects(&unattached.iter().collect::<Vec<_>>(), repo).await?;
tracing::info!("cleaned {} objects", count);
}
Ok(())
Expand All @@ -31,59 +30,74 @@ pub fn clean_untagged_objects(repo: &storage::RepositoryHandle) -> Result<()> {
///
/// # Errors
/// - [`spfs::Error::IncompleteClean`]: An accumulation of any errors hit during the prune process
pub fn purge_objects(
pub async fn purge_objects(
objects: &[&encoding::Digest],
repo: &storage::RepositoryHandle,
) -> Result<()> {
let repo = &repo.address();
let style = indicatif::ProgressStyle::default_bar()
.template(" {msg:<21} [{bar:40}] {pos:>7}/{len:7}")
.progress_chars("=>-");
let bar = indicatif::ProgressBar::new(objects.len() as u64).with_style(style.clone());
bar.set_message("1/3 cleaning objects");
let mut results: Vec<_> = objects
.par_iter()
.progress_with(bar)
.map(|digest| {
let res = clean_object(repo, digest);
if res.is_ok() {
tracing::trace!(?digest, "successfully removed object");
}
res
})
let obj_count = objects.len() as u64;
let multibar = std::sync::Arc::new(indicatif::MultiProgress::new());
let obj_bar = multibar.add(indicatif::ProgressBar::new(obj_count));
obj_bar.set_style(style.clone());
obj_bar.set_message("cleaning objects");
let payload_bar = multibar.add(indicatif::ProgressBar::new(obj_count));
payload_bar.set_style(style.clone());
payload_bar.set_message("cleaning payloads");
let render_bar = multibar.add(indicatif::ProgressBar::new(obj_count));
render_bar.set_style(style);
render_bar.set_message("cleaning renders");
let mut errors = Vec::new();

let bars_future = tokio::task::spawn_blocking(move || multibar.join());
let map_err = |e| Error::String(format!("Unexpected error in clean process: {}", e));

// we still do each of these pieces separately, because we'd like
// to ensure that objects are removed successfully before any
// related payloads, etc...
let mut futures: futures::stream::FuturesUnordered<_> = objects
.iter()
.map(|digest| tokio::spawn(clean_object(repo.clone(), **digest)))
.collect();
let bar = indicatif::ProgressBar::new(objects.len() as u64).with_style(style.clone());
bar.set_message("2/3 cleaning payloads");
results.append(
&mut objects
.par_iter()
.progress_with(bar)
.map(|digest| {
let res = clean_payload(repo, digest);
if res.is_ok() {
tracing::trace!(?digest, "successfully removed payload");
}
res
})
.collect(),
);
let bar = indicatif::ProgressBar::new(objects.len() as u64).with_style(style);
bar.set_message("3/3 cleaning renders");
results.append(
&mut objects
.par_iter()
.progress_with(bar)
.map(|digest| {
let res = clean_render(repo, digest);
if res.is_ok() {
tracing::trace!(?digest, "successfully removed render");
}
res
})
.collect(),
);
while let Some(result) = futures.next().await {
if let Err(err) = result.map_err(map_err).and_then(|e| e) {
errors.push(err);
}
obj_bar.inc(1);
}
obj_bar.finish();

let mut futures: futures::stream::FuturesUnordered<_> = objects
.iter()
.map(|digest| tokio::spawn(clean_payload(repo.clone(), **digest)))
.collect();
while let Some(result) = futures.next().await {
if let Err(err) = result.map_err(map_err).and_then(|e| e) {
errors.push(err);
}
payload_bar.inc(1);
}
payload_bar.finish();

let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();
let mut futures: futures::stream::FuturesUnordered<_> = objects
.iter()
.map(|digest| tokio::spawn(clean_render(repo.clone(), **digest)))
.collect();
while let Some(result) = futures.next().await {
if let Err(err) = result.map_err(map_err).and_then(|e| e) {
errors.push(err);
}
render_bar.inc(1);
}
render_bar.finish();

match bars_future.await {
Err(err) => tracing::warn!("{}", err),
Ok(Err(err)) => tracing::warn!("{}", err),
_ => (),
}

if !errors.is_empty() {
Err(Error::IncompleteClean { errors })
Expand All @@ -92,57 +106,59 @@ pub fn purge_objects(
}
}

fn clean_object(repo_addr: &url::Url, digest: &encoding::Digest) -> Result<()> {
let mut repo = storage::open_repository(repo_addr)?;
let res = repo.remove_object(digest);
async fn clean_object(repo_addr: url::Url, digest: encoding::Digest) -> Result<()> {
let repo = storage::open_repository(repo_addr).await?;
let res = repo.remove_object(digest).await;
if let Err(Error::UnknownObject(_)) = res {
Ok(())
} else {
res
}
}

fn clean_payload(repo_addr: &url::Url, digest: &encoding::Digest) -> Result<()> {
let mut repo = storage::open_repository(repo_addr)?;
let res = repo.remove_payload(digest);
async fn clean_payload(repo_addr: url::Url, digest: encoding::Digest) -> Result<()> {
let repo = storage::open_repository(repo_addr).await?;
let res = repo.remove_payload(digest).await;
if let Err(Error::UnknownObject(_)) = res {
Ok(())
} else {
res
}
}

fn clean_render(repo_addr: &url::Url, digest: &encoding::Digest) -> Result<()> {
let repo = storage::open_repository(repo_addr)?;
async fn clean_render(repo_addr: url::Url, digest: encoding::Digest) -> Result<()> {
let repo = storage::open_repository(repo_addr).await?;
let viewer = repo.renders()?;
let res = viewer.remove_rendered_manifest(digest);
let res = viewer.remove_rendered_manifest(digest).await;
if let Err(crate::Error::UnknownObject(_)) = res {
Ok(())
} else {
res
}
}

pub fn get_all_unattached_objects(
pub async fn get_all_unattached_objects(
repo: &storage::RepositoryHandle,
) -> Result<HashSet<encoding::Digest>> {
tracing::info!("evaluating repository digraph");
let mut digests = HashSet::new();
for digest in repo.iter_digests() {
let mut digest_stream = repo.iter_digests();
while let Some(digest) = digest_stream.next().await {
digests.insert(digest?);
}
let attached = &get_all_attached_objects(repo)?;
let attached = &get_all_attached_objects(repo).await?;
Ok(digests.difference(attached).copied().collect())
}

pub fn get_all_unattached_payloads(
pub async fn get_all_unattached_payloads(
repo: &storage::RepositoryHandle,
) -> Result<HashSet<encoding::Digest>> {
tracing::info!("searching for orphaned payloads");
let mut orphaned_payloads = HashSet::new();
for digest in repo.iter_payload_digests() {
let mut payloads = repo.iter_payload_digests();
while let Some(digest) = payloads.next().await {
let digest = digest?;
match repo.read_blob(&digest) {
match repo.read_blob(digest).await {
Err(Error::UnknownObject(_)) => {
orphaned_payloads.insert(digest);
}
Expand All @@ -153,14 +169,15 @@ pub fn get_all_unattached_payloads(
Ok(orphaned_payloads)
}

pub fn get_all_attached_objects(
pub async fn get_all_attached_objects(
repo: &storage::RepositoryHandle,
) -> Result<HashSet<encoding::Digest>> {
let mut to_process = Vec::new();
for item in repo.iter_tag_streams() {
let (_, stream) = item?;
for tag in stream {
to_process.push(tag.target);
let mut tag_streams = repo.iter_tag_streams();
while let Some(item) = tag_streams.next().await {
let (_, mut stream) = item?;
while let Some(tag) = stream.next().await {
to_process.push(tag?.target);
}
}

Expand All @@ -173,7 +190,7 @@ pub fn get_all_attached_objects(
continue;
}
tracing::debug!(?digest, "walking");
let obj = match repo.read_object(&digest) {
let obj = match repo.read_object(digest).await {
Ok(obj) => obj,
Err(err) => match err {
crate::Error::UnknownObject(err) => {
Expand Down
Loading

0 comments on commit 9ff40f5

Please sign in to comment.