Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the progress bar to display additional metrics and custimize o… #25

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/src/cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
#[instrument(skip(prog_track))]
#[async_recursion]
pub async fn cmp(
prog_track: &'static progress::TlsProgress,
prog_track: &'static progress::Progress,
src: &std::path::Path,
dst: &std::path::Path,
log: &LogWriter,
settings: &CmpSettings,
) -> Result<CmpSummary> {
throttle::get_token().await;
let _prog_guard = prog_track.guard();
let _prog_guard = prog_track.ops.guard();
event!(Level::DEBUG, "reading source metadata");
// it is impossible for src not exist other than user passing invalid path (which is an error)
let src_metadata = tokio::fs::symlink_metadata(src)
Expand Down Expand Up @@ -283,7 +283,7 @@ mod cmp_tests {
use super::*;

lazy_static! {
static ref PROGRESS: progress::TlsProgress = progress::TlsProgress::new();
static ref PROGRESS: progress::Progress = progress::Progress::new();
static ref NO_PRESERVE_SETTINGS: preserve::PreserveSettings = preserve::preserve_default();
static ref DO_PRESERVE_SETTINGS: preserve::PreserveSettings = preserve::preserve_all();
}
Expand Down
46 changes: 37 additions & 9 deletions common/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn is_file_type_same(md1: &std::fs::Metadata, md2: &std::fs::Metadata) -> bo

#[instrument(skip(prog_track))]
pub async fn copy_file(
prog_track: &'static progress::TlsProgress,
prog_track: &'static progress::Progress,
src: &std::path::Path,
dst: &std::path::Path,
settings: &CopySettings,
Expand Down Expand Up @@ -75,6 +75,7 @@ pub async fn copy_file(
)
{
event!(Level::DEBUG, "file is identical, skipping");
prog_track.files_unchanged.inc();
return Ok(CopySummary {
files_unchanged: 1,
..Default::default()
Expand Down Expand Up @@ -117,36 +118,42 @@ pub async fn copy_file(
.await
.with_context(|| format!("failed copying {:?} to {:?}", &src, &dst))
.map_err(|err| CopyError::new(err, copy_summary))?;
prog_track.files_copied.inc();
prog_track.bytes_copied.add(src_metadata.len());
event!(Level::DEBUG, "setting permissions");
preserve::set_file_metadata(preserve, &src_metadata, dst)
.await
.map_err(|err| CopyError::new(err, copy_summary))?;
copy_summary.files_copied += 1; // we mark files as "copied" only after all metadata is set as well
// we mark files as "copied" only after all metadata is set as well
copy_summary.bytes_copied += src_metadata.len();
copy_summary.files_copied += 1;
Ok(copy_summary)
}

#[derive(Copy, Clone, Debug, Default)]
pub struct CopySummary {
pub rm_summary: RmSummary,
pub bytes_copied: u64,
pub files_copied: usize,
pub symlinks_created: usize,
pub directories_created: usize,
pub files_unchanged: usize,
pub symlinks_unchanged: usize,
pub directories_unchanged: usize,
pub rm_summary: RmSummary,
}

impl std::ops::Add for CopySummary {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
rm_summary: self.rm_summary + other.rm_summary,
bytes_copied: self.bytes_copied + other.bytes_copied,
files_copied: self.files_copied + other.files_copied,
symlinks_created: self.symlinks_created + other.symlinks_created,
directories_created: self.directories_created + other.directories_created,
files_unchanged: self.files_unchanged + other.files_unchanged,
symlinks_unchanged: self.symlinks_unchanged + other.symlinks_unchanged,
directories_unchanged: self.directories_unchanged + other.directories_unchanged,
rm_summary: self.rm_summary + other.rm_summary,
}
}
}
Expand All @@ -155,16 +162,30 @@ impl std::fmt::Display for CopySummary {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}\nfiles copied: {}\nsymlinks created: {}\ndirectories created: {}\nfiles_unchanged: {}\ndirectories_unchanged: {}\n",
&self.rm_summary, self.files_copied, self.symlinks_created, self.directories_created, self.files_unchanged, self.directories_unchanged
"bytes copied: {}\n\
files copied: {}\n\
symlinks created: {}\n\
directories created: {}\n\
files unchanged: {}\n\
symlinks unchanged: {}\n\
directories unchanged: {}\n\
{}",
bytesize::ByteSize(self.bytes_copied),
self.files_copied,
self.symlinks_created,
self.directories_created,
self.files_unchanged,
self.symlinks_unchanged,
self.directories_unchanged,
&self.rm_summary,
)
}
}

#[instrument(skip(prog_track))]
#[async_recursion]
pub async fn copy(
prog_track: &'static progress::TlsProgress,
prog_track: &'static progress::Progress,
cwd: &std::path::Path,
src: &std::path::Path,
dst: &std::path::Path,
Expand All @@ -173,7 +194,7 @@ pub async fn copy(
mut is_fresh: bool,
) -> Result<CopySummary, CopyError> {
throttle::get_token().await;
let _prog_guard = prog_track.guard();
let _ops_guard = prog_track.ops.guard();
event!(Level::DEBUG, "reading source metadata");
let src_metadata = tokio::fs::symlink_metadata(src)
.await
Expand Down Expand Up @@ -246,6 +267,8 @@ pub async fn copy(
preserve::set_symlink_metadata(preserve, &src_metadata, dst)
.await
.map_err(|err| CopyError::new(err, Default::default()))?;
prog_track.symlinks_removed.inc();
prog_track.symlinks_created.inc();
return Ok(CopySummary {
rm_summary: RmSummary {
symlinks_removed: 1,
Expand All @@ -257,6 +280,7 @@ pub async fn copy(
}
}
event!(Level::DEBUG, "symlink already exists, skipping");
prog_track.symlinks_unchanged.inc();
return Ok(CopySummary {
symlinks_unchanged: 1,
..Default::default()
Expand Down Expand Up @@ -311,6 +335,7 @@ pub async fn copy(
};
CopyError::new(err, copy_summary)
})?;
prog_track.symlinks_created.inc();
return Ok(CopySummary {
rm_summary,
symlinks_created: 1,
Expand Down Expand Up @@ -347,6 +372,7 @@ pub async fn copy(
.map_err(|err| CopyError::new(err, Default::default()))?;
if dst_metadata.is_dir() {
event!(Level::DEBUG, "'dst' is a directory, leaving it as is");
prog_track.directories_unchanged.inc();
CopySummary {
directories_unchanged: 1,
..Default::default()
Expand Down Expand Up @@ -384,6 +410,7 @@ pub async fn copy(
})?;
// anythingg copied into dst may assume they don't need to check for conflicts
is_fresh = true;
prog_track.directories_created.inc();
CopySummary {
rm_summary,
directories_created: 1,
Expand All @@ -400,6 +427,7 @@ pub async fn copy(
} else {
// new directory created, anythingg copied into dst may assume they don't need to check for conflicts
is_fresh = true;
prog_track.directories_created.inc();
CopySummary {
directories_created: 1,
..Default::default()
Expand Down Expand Up @@ -483,7 +511,7 @@ mod copy_tests {
use super::*;

lazy_static! {
static ref PROGRESS: progress::TlsProgress = progress::TlsProgress::new();
static ref PROGRESS: progress::Progress = progress::Progress::new();
static ref NO_PRESERVE_SETTINGS: preserve::PreserveSettings = preserve::preserve_default();
static ref DO_PRESERVE_SETTINGS: preserve::PreserveSettings = preserve::preserve_all();
}
Expand Down
59 changes: 10 additions & 49 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use rm::RmSettings;
pub use rm::RmSummary;

lazy_static! {
static ref PROGRESS: progress::TlsProgress = progress::TlsProgress::new();
static ref PROGRESS: progress::Progress = progress::Progress::new();
}

struct ProgressTracker {
Expand Down Expand Up @@ -88,15 +88,13 @@ impl std::str::FromStr for ProgressType {

#[derive(Debug)]
pub struct ProgressSettings {
pub op_name: String,
pub progress_type: ProgressType,
pub progress_delay: Option<String>,
}

fn progress_bar(
lock: &std::sync::Mutex<bool>,
cvar: &std::sync::Condvar,
op_name: &str,
delay_opt: &Option<std::time::Duration>,
) {
let pbar = indicatif::ProgressBar::new_spinner();
Expand All @@ -106,23 +104,11 @@ fn progress_bar(
.unwrap()
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
);
let time_started = std::time::Instant::now();
let mut last_update = time_started;
let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
let mut is_done = lock.lock().unwrap();
loop {
let progress_status = PROGRESS.get();
let time_now = std::time::Instant::now();
let finished = progress_status.finished;
let in_progress = progress_status.started - progress_status.finished;
let avarage_rate = finished as f64 / time_started.elapsed().as_secs_f64();
let current_rate =
(finished - pbar.position()) as f64 / (time_now - last_update).as_secs_f64();
pbar.set_position(finished);
pbar.set_message(format!(
"done: {} | {}: {} | average: {:.2} items/s | current: {:.2} items/s",
finished, op_name, in_progress, avarage_rate, current_rate
));
last_update = time_now;
pbar.set_position(pbar.position() + 1); // do we need to update?
pbar.set_message(prog_printer.print().unwrap());
let result = cvar.wait_timeout(is_done, delay).unwrap();
is_done = result.0;
if *is_done {
Expand All @@ -135,33 +121,13 @@ fn progress_bar(
fn text_updates(
lock: &std::sync::Mutex<bool>,
cvar: &std::sync::Condvar,
op_name: &str,
delay_opt: &Option<std::time::Duration>,
) {
let time_started = std::time::Instant::now();
let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
let mut last_update = time_started;
let mut prev_finished = 0;
let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
let mut is_done = lock.lock().unwrap();
loop {
let progress_status = PROGRESS.get();
let time_now = std::time::Instant::now();
let finished = progress_status.finished;
let in_progress = progress_status.started - progress_status.finished;
let avarage_rate = finished as f64 / time_started.elapsed().as_secs_f64();
let current_rate =
(finished - prev_finished) as f64 / (time_now - last_update).as_secs_f64();
prev_finished = finished;
eprintln!(
"{} :: done: {} | {}: {} | average: {:.2} items/s | current: {:.2} items/s",
chrono::Local::now(),
finished,
op_name,
in_progress,
avarage_rate,
current_rate
);
last_update = time_now;
eprintln!("{}", prog_printer.print().unwrap());
let result = cvar.wait_timeout(is_done, delay).unwrap();
is_done = result.0;
if *is_done {
Expand All @@ -171,12 +137,7 @@ fn text_updates(
}

impl ProgressTracker {
pub fn new(
progress_type: ProgressType,
op_name: &str,
delay_opt: Option<std::time::Duration>,
) -> Self {
let op_name = op_name.to_string();
pub fn new(progress_type: ProgressType, delay_opt: Option<std::time::Duration>) -> Self {
let lock_cvar =
std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
let lock_cvar_clone = lock_cvar.clone();
Expand All @@ -188,9 +149,9 @@ impl ProgressTracker {
ProgressType::TextUpdates => false,
};
if interactive {
progress_bar(lock, cvar, &op_name, &delay_opt);
progress_bar(lock, cvar, &delay_opt);
} else {
text_updates(lock, cvar, &op_name, &delay_opt);
text_updates(lock, cvar, &delay_opt);
}
});
Self {
Expand Down Expand Up @@ -501,7 +462,7 @@ where
humantime::parse_duration(&delay_str)
.expect("Couldn't parse duration out of --progress-delay")
});
ProgressTracker::new(settings.progress_type, &settings.op_name, delay)
ProgressTracker::new(settings.progress_type, delay)
});
runtime.block_on(func())
};
Expand Down
12 changes: 7 additions & 5 deletions common/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ fn is_hard_link(md1: &std::fs::Metadata, md2: &std::fs::Metadata) -> bool {
&& md2.st_ino() == md1.st_ino()
}

#[instrument]
#[instrument(skip(prog_track))]
async fn hard_link_helper(
prog_track: &'static progress::TlsProgress,
prog_track: &'static progress::Progress,
src: &std::path::Path,
src_metadata: &std::fs::Metadata,
dst: &std::path::Path,
Expand All @@ -92,6 +92,7 @@ async fn hard_link_helper(
.map_err(|err| LinkError::new(err, Default::default()))?;
if is_hard_link(src_metadata, &dst_metadata) {
event!(Level::DEBUG, "no change, leaving file as is");
prog_track.hard_links_unchanged.inc();
return Ok(LinkSummary {
hard_links_unchanged: 1,
..Default::default()
Expand Down Expand Up @@ -120,14 +121,15 @@ async fn hard_link_helper(
.map_err(|err| LinkError::new(anyhow::Error::msg(err), link_summary))?;
}
}
prog_track.hard_links_created.inc();
link_summary.hard_links_created = 1;
Ok(link_summary)
}

#[instrument(skip(prog_track))]
#[async_recursion]
pub async fn link(
prog_track: &'static progress::TlsProgress,
prog_track: &'static progress::Progress,
cwd: &std::path::Path,
src: &std::path::Path,
dst: &std::path::Path,
Expand All @@ -136,7 +138,7 @@ pub async fn link(
mut is_fresh: bool,
) -> Result<LinkSummary, LinkError> {
throttle::get_token().await;
let _prog_guard = prog_track.guard();
let _prog_guard = prog_track.ops.guard();
event!(Level::DEBUG, "reading source metadata");
let src_metadata = tokio::fs::symlink_metadata(src)
.await
Expand Down Expand Up @@ -528,7 +530,7 @@ mod link_tests {
use super::*;

lazy_static! {
static ref PROGRESS: progress::TlsProgress = progress::TlsProgress::new();
static ref PROGRESS: progress::Progress = progress::Progress::new();
}

fn common_settings(dereference: bool, overwrite: bool) -> LinkSettings {
Expand Down
Loading