diff --git a/common/src/cmp.rs b/common/src/cmp.rs index 0960658..8dde3bc 100644 --- a/common/src/cmp.rs +++ b/common/src/cmp.rs @@ -279,7 +279,6 @@ mod cmp_tests { &test_path.join("foo"), &test_path.join("bar"), ©::CopySettings { - read_buffer: 10, dereference: false, fail_early: false, overwrite: false, @@ -294,6 +293,7 @@ mod cmp_tests { } else { &NO_PRESERVE_SETTINGS }, + false, ) .await?; Ok(tmp_dir) diff --git a/common/src/copy.rs b/common/src/copy.rs index 19b2159..4381079 100644 --- a/common/src/copy.rs +++ b/common/src/copy.rs @@ -1,6 +1,5 @@ use anyhow::{Context, Result}; use async_recursion::async_recursion; -use tokio::io::AsyncWriteExt; use tracing::{event, instrument, Level}; use crate::filecmp; @@ -10,7 +9,6 @@ use crate::rm; #[derive(Debug, Copy, Clone)] pub struct CopySettings { - pub read_buffer: usize, pub dereference: bool, pub fail_early: bool, pub overwrite: bool, @@ -33,70 +31,59 @@ pub async fn copy_file( dst: &std::path::Path, settings: &CopySettings, preserve: &preserve::PreserveSettings, + is_fresh: bool, ) -> Result { event!( Level::DEBUG, "opening 'src' for reading and 'dst' for writing" ); - let mut reader = tokio::fs::File::open(src) + let reader = tokio::fs::File::open(src) .await .with_context(|| format!("cannot open {:?} for reading", src))?; let mut rm_summary = rm::RmSummary::default(); - let mut writer = { - match tokio::fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(dst) - .await - { - Ok(writer) => writer, - Err(error) => { - if settings.overwrite && error.kind() == std::io::ErrorKind::AlreadyExists { - event!(Level::DEBUG, "file exists, check if it's identical"); - let md1 = reader.metadata().await?; - let md2 = tokio::fs::symlink_metadata(dst) - .await - .with_context(|| format!("failed reading metadata from {:?}", &dst))?; - if is_file_type_same(&md1, &md2) - && filecmp::metadata_equal(&settings.overwrite_compare, &md1, &md2) - { - event!(Level::DEBUG, "file is identical, skipping"); - return Ok(CopySummary { - files_unchanged: 1, - ..Default::default() - }); - } - event!(Level::DEBUG, "file is different, removing existing file"); - rm_summary = rm::rm( - prog_track, - dst, - &rm::Settings { - fail_early: settings.fail_early, - }, - ) - .await?; - tokio::fs::File::create(dst) - .await - .with_context(|| format!("cannot create file {:?}", dst))? - } else { - return Err(error).with_context(|| format!("cannot create file {:?}", dst)); - } + if !is_fresh && dst.exists() { + if settings.overwrite { + event!(Level::DEBUG, "file exists, check if it's identical"); + let md1 = reader.metadata().await?; + let md2 = tokio::fs::symlink_metadata(dst) + .await + .with_context(|| format!("failed reading metadata from {:?}", &dst))?; + if is_file_type_same(&md1, &md2) + && filecmp::metadata_equal(&settings.overwrite_compare, &md1, &md2) + { + event!(Level::DEBUG, "file is identical, skipping"); + return Ok(CopySummary { + files_unchanged: 1, + ..Default::default() + }); } + event!(Level::DEBUG, "file is different, removing existing file"); + // note tokio::fs::overwrite cannot handle this path being e.g. a directory + rm_summary = rm::rm( + prog_track, + dst, + &rm::Settings { + fail_early: settings.fail_early, + }, + ) + .await?; + } else { + return Err(anyhow::anyhow!( + "destination {:?} already exists, did you intend to specify --overwrite?", + dst + )); } - }; + } event!(Level::DEBUG, "copying data"); - let mut buf_reader = tokio::io::BufReader::with_capacity(settings.read_buffer, &mut reader); - tokio::io::copy_buf(&mut buf_reader, &mut writer) + tokio::fs::copy(src, dst) .await - .with_context(|| format!("failed copying data to {:?}", &dst))?; + .with_context(|| format!("failed copying {:?} to {:?}", &src, &dst))?; event!(Level::DEBUG, "setting permissions"); let src_metadata = reader .metadata() .await .with_context(|| format!("failed reading metadata from {:?}", &src))?; - if preserve.file.user_and_time.time { - writer.flush().await?; // flush all writes to avoid a race with the timestamp update - } + let writer = tokio::fs::File::open(dst).await?; preserve::set_file_metadata(preserve, &src_metadata, &writer, dst).await?; Ok(CopySummary { rm_summary, @@ -150,6 +137,7 @@ pub async fn copy( dst: &std::path::Path, settings: &CopySettings, preserve: &preserve::PreserveSettings, + mut is_fresh: bool, ) -> Result { let _guard = prog_track.guard(); event!(Level::DEBUG, "reading source metadata"); @@ -174,10 +162,13 @@ pub async fn copy( ) }) .unwrap(); - return copy(prog_track, new_cwd, &abs_link, dst, settings, preserve).await; + return copy( + prog_track, new_cwd, &abs_link, dst, settings, preserve, is_fresh, + ) + .await; } if src_metadata.is_file() { - return copy_file(prog_track, src, dst, settings, preserve).await; + return copy_file(prog_track, src, dst, settings, preserve, is_fresh).await; } if src_metadata.is_symlink() { let mut rm_summary = rm::RmSummary::default(); @@ -268,6 +259,7 @@ pub async fn copy( .with_context(|| format!("cannot open directory {:?} for reading", src))?; let mut copy_summary = { if let Err(error) = tokio::fs::create_dir(dst).await { + assert!(!is_fresh, "unexpected error creating directory: {:?}", &dst); if settings.overwrite && error.kind() == std::io::ErrorKind::AlreadyExists { // check if the destination is a directory - if so, leave it // @@ -299,6 +291,8 @@ pub async fn copy( tokio::fs::create_dir(dst) .await .with_context(|| format!("cannot create directory {:?}", dst))?; + // anythingg copied into dst may assume they don't need to check for conflicts + is_fresh = true; CopySummary { rm_summary, directories_created: 1, @@ -309,7 +303,8 @@ pub async fn copy( return Err(error).with_context(|| format!("cannot create directory {:?}", dst)); } } else { - // new directory created, no conflicts + // new directory created, anythingg copied into dst may assume they don't need to check for conflicts + is_fresh = true; CopySummary { directories_created: 1, ..Default::default() @@ -337,6 +332,7 @@ pub async fn copy( &dst_path, &settings, &preserve, + is_fresh, ) .await }; @@ -394,7 +390,6 @@ mod copy_tests { &test_path.join("foo"), &test_path.join("bar"), &CopySettings { - read_buffer: 10, dereference: false, fail_early: false, overwrite: false, @@ -405,6 +400,7 @@ mod copy_tests { }, }, &NO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.files_copied, 5); @@ -436,7 +432,6 @@ mod copy_tests { &test_path.join("foo"), &test_path.join("bar"), &CopySettings { - read_buffer: 5, dereference: false, fail_early: false, overwrite: false, @@ -447,6 +442,7 @@ mod copy_tests { }, }, &NO_PRESERVE_SETTINGS, + false, ) .await { @@ -512,7 +508,6 @@ mod copy_tests { &test_path.join("foo"), &test_path.join("bar"), &CopySettings { - read_buffer: 7, dereference: false, fail_early: false, overwrite: false, @@ -523,6 +518,7 @@ mod copy_tests { }, }, &NO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.files_copied, 5); @@ -574,7 +570,6 @@ mod copy_tests { &test_path.join("foo"), &test_path.join("bar"), &CopySettings { - read_buffer: 8, dereference: false, fail_early: false, overwrite: false, @@ -585,6 +580,7 @@ mod copy_tests { }, }, &NO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.files_copied, 5); @@ -617,7 +613,6 @@ mod copy_tests { &test_path.join("foo"), &test_path.join("bar"), &CopySettings { - read_buffer: 10, dereference: true, // <- important! fail_early: false, overwrite: false, @@ -628,6 +623,7 @@ mod copy_tests { }, }, &NO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.files_copied, 7); @@ -678,6 +674,7 @@ mod copy_tests { } else { &NO_PRESERVE_SETTINGS }, + false, ) .await?; if rcp_settings.dereference { @@ -707,7 +704,6 @@ mod copy_tests { cp_compare( &["-r"], &CopySettings { - read_buffer: 100, dereference: false, fail_early: false, overwrite: false, @@ -729,7 +725,6 @@ mod copy_tests { cp_compare( &["-r", "-p"], &CopySettings { - read_buffer: 100, dereference: false, fail_early: false, overwrite: false, @@ -751,7 +746,6 @@ mod copy_tests { cp_compare( &["-r", "-L"], &CopySettings { - read_buffer: 100, dereference: true, fail_early: false, overwrite: false, @@ -773,7 +767,6 @@ mod copy_tests { cp_compare( &["-r", "-p", "-L"], &CopySettings { - read_buffer: 100, dereference: true, fail_early: false, overwrite: false, @@ -798,7 +791,6 @@ mod copy_tests { &test_path.join("foo"), &test_path.join("bar"), &CopySettings { - read_buffer: 10, dereference: false, fail_early: false, overwrite: false, @@ -809,6 +801,7 @@ mod copy_tests { }, }, &DO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.files_copied, 5); @@ -855,7 +848,6 @@ mod copy_tests { &tmp_dir.join("foo"), &output_path, &CopySettings { - read_buffer: 10, dereference: false, fail_early: false, overwrite: true, // <- important! @@ -866,6 +858,7 @@ mod copy_tests { }, }, &DO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.files_copied, 3); @@ -924,7 +917,6 @@ mod copy_tests { &tmp_dir.join("foo"), &output_path, &CopySettings { - read_buffer: 10, dereference: false, fail_early: false, overwrite: true, // <- important! @@ -935,6 +927,7 @@ mod copy_tests { }, }, &DO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.rm_summary.files_removed, 1); @@ -992,7 +985,6 @@ mod copy_tests { &tmp_dir.join("foo"), &output_path, &CopySettings { - read_buffer: 10, dereference: false, fail_early: false, overwrite: true, // <- important! @@ -1003,6 +995,7 @@ mod copy_tests { }, }, &DO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.rm_summary.files_removed, 1); @@ -1063,7 +1056,6 @@ mod copy_tests { &tmp_dir.join("foo"), &output_path, &CopySettings { - read_buffer: 10, dereference: false, fail_early: false, overwrite: true, // <- important! @@ -1074,6 +1066,7 @@ mod copy_tests { }, }, &DO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.rm_summary.files_removed, 0); @@ -1108,7 +1101,6 @@ mod copy_tests { &tmp_dir.join("foo"), &tmp_dir.join("bar"), &CopySettings { - read_buffer: 10, dereference: true, // <- important! fail_early: false, overwrite: false, @@ -1119,6 +1111,7 @@ mod copy_tests { }, }, &DO_PRESERVE_SETTINGS, + false, ) .await?; assert_eq!(summary.files_copied, 13); // 0.txt, 3x bar/(1.txt, 2.txt, 3.txt), baz/(4.txt, 5.txt, 6.txt) diff --git a/common/src/lib.rs b/common/src/lib.rs index 49760aa..8f5789b 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -208,7 +208,7 @@ pub async fn copy( preserve: &preserve::PreserveSettings, ) -> Result { let cwd = std::env::current_dir()?; - copy::copy(&PROGRESS, &cwd, src, dst, settings, preserve).await + copy::copy(&PROGRESS, &cwd, src, dst, settings, preserve, false).await } pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result { @@ -222,7 +222,7 @@ pub async fn link( settings: &link::LinkSettings, ) -> Result { let cwd = std::env::current_dir()?; - link::link(&PROGRESS, &cwd, src, dst, update, settings).await + link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await } fn read_env_or_default(name: &str, default: T) -> T { diff --git a/common/src/link.rs b/common/src/link.rs index 48994be..3b14ddf 100644 --- a/common/src/link.rs +++ b/common/src/link.rs @@ -106,6 +106,7 @@ pub async fn link( dst: &std::path::Path, update: &Option, settings: &LinkSettings, + mut is_fresh: bool, ) -> Result { let _guard = prog_track.guard(); event!(Level::DEBUG, "reading source metadata"); @@ -150,6 +151,7 @@ pub async fn link( dst, &settings.copy_settings, &RLINK_PRESERVE_SETTINGS, + is_fresh, ) .await?; return Ok(LinkSummary { @@ -176,6 +178,7 @@ pub async fn link( dst, &settings.copy_settings, &RLINK_PRESERVE_SETTINGS, + is_fresh, ) .await?, ..Default::default() @@ -192,6 +195,7 @@ pub async fn link( dst, &settings.copy_settings, &RLINK_PRESERVE_SETTINGS, + is_fresh, ) .await?; return Ok(LinkSummary { @@ -215,6 +219,7 @@ pub async fn link( dst, &settings.copy_settings, &RLINK_PRESERVE_SETTINGS, + is_fresh, ) .await?; return Ok(LinkSummary { @@ -238,6 +243,7 @@ pub async fn link( .with_context(|| format!("cannot open directory {:?} for reading", src))?; let copy_summary = { if let Err(error) = tokio::fs::create_dir(dst).await { + assert!(!is_fresh, "unexpected error creating directory: {:?}", &dst); if settings.copy_settings.overwrite && error.kind() == std::io::ErrorKind::AlreadyExists { // check if the destination is a directory - if so, leave it @@ -270,6 +276,8 @@ pub async fn link( tokio::fs::create_dir(dst) .await .with_context(|| format!("cannot create directory {:?}", dst))?; + // anythingg copied into dst may assume they don't need to check for conflicts + is_fresh = true; copy::CopySummary { rm_summary, directories_created: 1, @@ -280,7 +288,8 @@ pub async fn link( return Err(error).with_context(|| format!("cannot create directory {:?}", dst)); } } else { - // new directory created, no conflicts + // new directory created, anythingg copied into dst may assume they don't need to check for conflicts + is_fresh = true; copy::CopySummary { directories_created: 1, ..Default::default() @@ -312,6 +321,7 @@ pub async fn link( &dst_path, &update_path, &settings, + is_fresh, ) .await }; @@ -349,6 +359,7 @@ pub async fn link( &dst_path, &settings.copy_settings, &RLINK_PRESERVE_SETTINGS, + is_fresh, ) .await?; Ok(LinkSummary { @@ -414,7 +425,6 @@ mod link_tests { fn common_settings(dereference: bool, overwrite: bool) -> LinkSettings { LinkSettings { copy_settings: copy::CopySettings { - read_buffer: 10, dereference, fail_early: false, overwrite, @@ -444,6 +454,7 @@ mod link_tests { &test_path.join("bar"), &None, &common_settings(false, false), + false, ) .await?; assert_eq!(summary.hard_links_created, 5); @@ -471,6 +482,7 @@ mod link_tests { &test_path.join("bar"), &Some(test_path.join("foo")), &common_settings(false, false), + false, ) .await?; assert_eq!(summary.hard_links_created, 5); @@ -499,6 +511,7 @@ mod link_tests { &test_path.join("bar"), &Some(test_path.join("foo")), &common_settings(false, false), + false, ) .await?; assert_eq!(summary.hard_links_created, 0); @@ -550,6 +563,7 @@ mod link_tests { &test_path.join("bar"), &Some(test_path.join("update")), &common_settings(false, false), + false, ) .await?; assert_eq!(summary.hard_links_created, 2); @@ -583,6 +597,7 @@ mod link_tests { &test_path.join("bar"), &None, &common_settings(false, false), + false, ) .await?; assert_eq!(summary.hard_links_created, 5); @@ -630,6 +645,7 @@ mod link_tests { &output_path, &None, &common_settings(false, true), // overwrite! + false, ) .await?; assert_eq!(summary.hard_links_created, 3); @@ -689,6 +705,7 @@ mod link_tests { &output_path, &Some(tmp_dir.join("update")), &common_settings(false, true), // overwrite! + false, ) .await?; assert_eq!(summary.hard_links_created, 1); // 3.txt @@ -775,6 +792,7 @@ mod link_tests { &output_path, &None, &common_settings(false, true), // overwrite! + false, ) .await?; assert_eq!(summary.hard_links_created, 4); diff --git a/filegen/src/main.rs b/filegen/src/main.rs index ee8b929..d6d1f39 100644 --- a/filegen/src/main.rs +++ b/filegen/src/main.rs @@ -2,6 +2,7 @@ use anyhow::{Context, Result}; use async_recursion::async_recursion; use rand::Rng; use structopt::StructOpt; +use tokio::io::AsyncWriteExt; #[derive(Debug)] struct Dirwidth { @@ -20,7 +21,7 @@ impl std::str::FromStr for Dirwidth { } #[derive(StructOpt, Debug)] -#[structopt(name = "rcp")] +#[structopt(name = "filegen")] struct Args { /// Root directory where files are generated #[structopt(parse(from_os_str))] @@ -46,6 +47,34 @@ struct Args { /// Size of each file. Accepts suffixes like "1K", "1M", "1G" #[structopt()] filesize: String, + + /// Size of the buffer used to write to each file. Accepts suffixes like "1K", "1M", "1G" + #[structopt(default_value = "4K")] + bufsize: String, +} + +async fn write_file(path: std::path::PathBuf, mut filesize: usize, bufsize: usize) -> Result<()> { + let mut bytes = vec![0u8; bufsize]; + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(&path) + .await + .context(format!("Error opening {:?}", &path))?; + while filesize > 0 { + { + // make sure rng falls out of scope before await + let mut rng = rand::thread_rng(); + rng.fill(&mut bytes[..]); + } + let writesize = std::cmp::min(filesize, bufsize) as usize; + file.write_all(&bytes[..writesize]) + .await + .context(format!("Error writing to {:?}", &path))?; + filesize -= writesize; + } + Ok(()) } #[async_recursion] @@ -53,7 +82,8 @@ async fn filegen( root: &std::path::Path, dirwidth: &[usize], numfiles: usize, - filesize: u64, + filesize: usize, + writebuf: usize, ) -> Result<()> { let numdirs = *dirwidth.first().unwrap_or(&0); let mut join_set = tokio::task::JoinSet::new(); @@ -65,22 +95,14 @@ async fn filegen( tokio::fs::create_dir(&path) .await .map_err(anyhow::Error::msg)?; - filegen(&path, &dirwidth, numfiles, filesize).await + filegen(&path, &dirwidth, numfiles, filesize, writebuf).await }; join_set.spawn(recurse()); } // generate files for i in 0..numfiles { let path = root.join(format!("file{}", i)); - let mut rng = rand::thread_rng(); - let mut bytes = vec![0u8; filesize as usize]; - rng.fill(&mut bytes[..]); - let create_file = || async move { - tokio::fs::write(path, &bytes) - .await - .map_err(anyhow::Error::msg) - }; - join_set.spawn(create_file()); + join_set.spawn(write_file(path.clone(), filesize, writebuf)); } while let Some(res) = join_set.join_next().await { res?? @@ -96,11 +118,19 @@ async fn main() -> Result<()> { .filesize .parse::() .unwrap() - .as_u64(); + .as_u64() as usize; + let writebuf = args.bufsize.parse::().unwrap().as_u64() as usize; let root = args.root.join("filegen"); tokio::fs::create_dir(&root) .await .context(format!("Error creating {:?}", &root))?; - filegen(&root, &args.dirwidth.value, args.numfiles, filesize).await?; + filegen( + &root, + &args.dirwidth.value, + args.numfiles, + filesize, + writebuf, + ) + .await?; Ok(()) } diff --git a/rcp/src/main.rs b/rcp/src/main.rs index 5ff3f49..702e977 100644 --- a/rcp/src/main.rs +++ b/rcp/src/main.rs @@ -74,10 +74,6 @@ struct Args { /// Number of blocking worker threads, 0 means Tokio runtime default (512) #[structopt(long, default_value = "0")] max_blocking_threads: usize, - - /// File copy read buffer size - #[structopt(long, default_value = "128KiB")] - read_buffer: String, } #[instrument] @@ -123,14 +119,8 @@ async fn async_main(args: Args) -> Result { std::path::PathBuf::from(dst_string), )] }; - let read_buffer = args - .read_buffer - .parse::() - .unwrap() - .as_u64() as usize; let mut join_set = tokio::task::JoinSet::new(); let settings = common::CopySettings { - read_buffer, dereference: args.dereference, fail_early: args.fail_early, overwrite: args.overwrite, diff --git a/rlink/src/main.rs b/rlink/src/main.rs index 1498dbd..1452f24 100644 --- a/rlink/src/main.rs +++ b/rlink/src/main.rs @@ -60,10 +60,6 @@ struct Args { /// Number of blocking worker threads, 0 means Tokio runtime default (512) #[structopt(long, default_value = "0")] max_blocking_threads: usize, - - /// File copy read buffer size - #[structopt(long, default_value = "128KiB")] - read_buffer: String, } async fn async_main(args: Args) -> Result { @@ -91,18 +87,12 @@ async fn async_main(args: Args) -> Result { } else { std::path::PathBuf::from(args.dst) }; - let read_buffer = args - .read_buffer - .parse::() - .unwrap() - .as_u64() as usize; common::link( &args.src, &dst, &args.update, &common::LinkSettings { copy_settings: common::CopySettings { - read_buffer, dereference: false, // currently not supported fail_early: args.fail_early, overwrite: args.overwrite, diff --git a/scripts/parse-strace.sh b/scripts/parse-strace.sh new file mode 100755 index 0000000..230dc0c --- /dev/null +++ b/scripts/parse-strace.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +# Capture strace output or provide filename as argument +strace_output=${1} + +# Check if strace output provided +if [[ -z "$strace_output" ]]; then + echo "Usage: $0 " + exit 1 +fi + +read_bytes=$(rg 'read.* = (\d+)' $strace_output -or '$1'|awk '{ SUM += $0 } END { print SUM }') +write_bytes=$(rg 'write.* = (\d+)' $strace_output -or '$1'|awk '{ SUM += $0 } END { print SUM }') + +# Print results +echo " read bytes: $read_bytes" +echo "write bytes: $write_bytes" diff --git a/scripts/runner.sh b/scripts/runner.sh new file mode 100755 index 0000000..a342a9d --- /dev/null +++ b/scripts/runner.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +root_dir=${1:-/tmp} +dirwidth=${2:-0} +num_files=${3:-1} +file_size=${4:-10M} + +cargo build --release + +export PATH=$(pwd)/target/release:$PATH + +rrm --quiet $root_dir/filegen $root_dir/filegen-test +filegen -- $root_dir $dirwidth $num_files $file_size + +echo $cwd +echo $pwd +strace -fttt rcp --progress --summary --overwrite $root_dir/filegen $root_dir/filegen-test 2> $root_dir/strace.log + +scripts/parse-strace.sh $root_dir/strace.log \ No newline at end of file