Skip to content

Commit

Permalink
Merge pull request #655 from cgwalters/install-progress
Browse files Browse the repository at this point in the history
install: Disable fsync() in repo when pulling && improved pull progress
  • Loading branch information
cgwalters committed Jul 3, 2024
2 parents 9459a66 + d8b5df2 commit 2c4ca36
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 31 deletions.
9 changes: 5 additions & 4 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
}
}
} else {
let fetched = crate::deploy::pull(sysroot, imgref, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, imgref, opts.quiet).await?;
let kargs = crate::kargs::get_kargs(repo, &booted_deployment, fetched.as_ref())?;
let staged_digest = staged_image.as_ref().map(|s| s.image_digest.as_str());
let fetched_digest = fetched.manifest_digest.as_str();
Expand Down Expand Up @@ -637,7 +637,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;

let fetched = crate::deploy::pull(sysroot, &target, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, &target, opts.quiet).await?;
let kargs = crate::kargs::get_kargs(repo, &booted_deployment, fetched.as_ref())?;

if !opts.retain {
Expand Down Expand Up @@ -671,6 +671,8 @@ async fn rollback(_opts: RollbackOpts) -> Result<()> {
#[context("Editing spec")]
async fn edit(opts: EditOpts) -> Result<()> {
let sysroot = &get_locked_sysroot().await?;
let repo = &sysroot.repo();

let (booted_deployment, _deployments, host) =
crate::status::get_status_require_booted(sysroot)?;
let new_host: Host = if let Some(filename) = opts.filename {
Expand All @@ -697,8 +699,7 @@ async fn edit(opts: EditOpts) -> Result<()> {
return crate::deploy::rollback(sysroot).await;
}

let fetched = crate::deploy::pull(sysroot, new_spec.image, opts.quiet).await?;
let repo = &sysroot.repo();
let fetched = crate::deploy::pull(repo, new_spec.image, opts.quiet).await?;
let kargs = crate::kargs::get_kargs(repo, &booted_deployment, fetched.as_ref())?;

// TODO gc old layers here
Expand Down
104 changes: 77 additions & 27 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use fn_error_context::context;
use ostree::{gio, glib};
use ostree_container::OstreeImageReference;
use ostree_ext::container as ostree_container;
use ostree_ext::container::store::PrepareResult;
use ostree_ext::container::store::{ImportProgress, PrepareResult};
use ostree_ext::oci_spec::image::Descriptor;
use ostree_ext::ostree;
use ostree_ext::ostree::Deployment;
use ostree_ext::sysroot::SysrootLock;
Expand Down Expand Up @@ -112,34 +113,76 @@ pub(crate) fn check_bootc_label(config: &ostree_ext::oci_spec::image::ImageConfi
}
}

fn descriptor_of_progress(p: &ImportProgress) -> &Descriptor {
match p {
ImportProgress::OstreeChunkStarted(l) => l,
ImportProgress::OstreeChunkCompleted(l) => l,
ImportProgress::DerivedLayerStarted(l) => l,
ImportProgress::DerivedLayerCompleted(l) => l,
}
}

fn prefix_of_progress(p: &ImportProgress) -> &'static str {
match p {
ImportProgress::OstreeChunkStarted(_) | ImportProgress::OstreeChunkCompleted(_) => {
"ostree chunk"
}
ImportProgress::DerivedLayerStarted(_) | ImportProgress::DerivedLayerCompleted(_) => {
"layer"
}
}
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
total_layers: usize,
n_layers_fetched: &mut usize,
n_layers_to_fetch: usize,
) {
let style = indicatif::ProgressStyle::default_bar();
let pb = indicatif::ProgressBar::new(100);
pb.set_style(
style
.template("{prefix} {bytes} [{bar:20}] ({eta}) {msg}")
let start = std::time::Instant::now();
let mut total_read = 0u64;
let bar = indicatif::MultiProgress::new();
let layers_bar = bar.add(indicatif::ProgressBar::new(
n_layers_to_fetch.try_into().unwrap(),
));
let byte_bar = bar.add(indicatif::ProgressBar::new(0));
// let byte_bar = indicatif::ProgressBar::new(0);
// byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
layers_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("{prefix} {bar} {pos}/{len} {wide_msg}")
.unwrap(),
);
layers_bar.set_prefix("Fetching layers");
layers_bar.set_message("");
byte_bar.set_prefix("Fetching");
byte_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template(
" └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}",
)
.unwrap()
);
loop {
tokio::select! {
// Always handle layer changes first.
biased;
layer = layers.recv() => {
if let Some(l) = layer {
let layer = descriptor_of_progress(&l);
let layer_size = u64::try_from(layer.size()).unwrap();
if l.is_starting() {
pb.set_position(0);
byte_bar.reset_elapsed();
byte_bar.reset_eta();
byte_bar.set_length(layer_size);
let layer_type = prefix_of_progress(&l);
let short_digest = &layer.digest()[0..21];
byte_bar.set_message(format!("{layer_type} {short_digest}"));
} else {
pb.finish();
*n_layers_fetched += 1;
byte_bar.set_position(layer_size);
layers_bar.inc(1);
total_read = total_read.saturating_add(layer_size);
}
pb.set_prefix(format!("[{}/{}]", *n_layers_fetched, total_layers));
pb.set_message(ostree_ext::cli::layer_progress_format(&l));
} else {
// If the receiver is disconnected, then we're done
break
Expand All @@ -152,23 +195,35 @@ async fn handle_layer_progress_print(
}
let bytes = layer_bytes.borrow();
if let Some(bytes) = &*bytes {
pb.set_length(bytes.total);
pb.set_position(bytes.fetched);
byte_bar.set_position(bytes.fetched);
}
}

}
}
byte_bar.finish_and_clear();
layers_bar.finish_and_clear();
if let Err(e) = bar.clear() {
tracing::warn!("clearing bar: {e}");
}
let end = std::time::Instant::now();
let elapsed = end.duration_since(start);
let persec = total_read as f64 / elapsed.as_secs_f64();
let persec = indicatif::HumanBytes(persec as u64);
println!(
"Fetched layers: {} in {} ({}/s)",
indicatif::HumanBytes(total_read),
indicatif::HumanDuration(elapsed),
persec,
);
}

/// Wrapper for pulling a container image, wiring up status output.
#[context("Pulling")]
pub(crate) async fn pull(
sysroot: &SysrootLock,
repo: &ostree::Repo,
imgref: &ImageReference,
quiet: bool,
) -> Result<Box<ImageState>> {
let repo = &sysroot.repo();
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
let prep = match imp.prepare().await? {
Expand All @@ -183,19 +238,14 @@ pub(crate) async fn pull(
ostree_ext::cli::print_deprecated_warning(warning).await;
}
ostree_ext::cli::print_layer_status(&prep);
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
let n_layers_to_fetch = layers_to_fetch.len();
let printer = (!quiet).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
let total_layers = prep.layers_to_fetch().count();
let mut n_fetched = 0usize;
tokio::task::spawn(async move {
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
total_layers,
&mut n_fetched,
)
.await
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch)
.await
})
});
let import = imp.import(prep).await;
Expand Down
11 changes: 11 additions & 0 deletions lib/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use serde::{Deserialize, Serialize};
use self::baseline::InstallBlockDeviceOpts;
use crate::containerenv::ContainerExecutionInfo;
use crate::mount::Filesystem;
use crate::spec::ImageReference;
use crate::task::Task;
use crate::utils::sigpolicy_from_opts;

Expand Down Expand Up @@ -651,6 +652,16 @@ async fn initialize_ostree_root_from_self(
imgref: src_imageref,
};

// Pull the container image into the target root filesystem. Since this is
// an install path, we don't need to fsync() individual layers.
{
let spec_imgref = ImageReference::from(src_imageref.clone());
let repo = &sysroot.repo();
repo.set_disable_fsync(true);
crate::deploy::pull(repo, &spec_imgref, false).await?;
repo.set_disable_fsync(false);
}

// Load the kargs from the /usr/lib/bootc/kargs.d from the running root,
// which should be the same as the filesystem we'll deploy.
let kargsd = crate::kargs::get_kargs_in_root(container_rootfs, std::env::consts::ARCH)?;
Expand Down

0 comments on commit 2c4ca36

Please sign in to comment.