Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

add retry with backoff to azcopy calls #701

Merged
5 commits merged into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/agent/onefuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ process_control = "3.0"
reqwest-retry = { path = "../reqwest-retry"}
onefuzz-telemetry = { path = "../onefuzz-telemetry"}
stacktrace-parser = { path = "../stacktrace-parser" }
backoff = { version = "0.3", features = ["async-std"] }

[target.'cfg(target_os = "windows")'.dependencies]
winreg = "0.8"
Expand Down
121 changes: 78 additions & 43 deletions src/agent/onefuzz/src/az_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,43 @@
// Licensed under the MIT License.

use anyhow::{Context, Result};
use backoff::{self, future::retry_notify, ExponentialBackoff};
use std::ffi::OsStr;
use std::fmt;
use std::process::Stdio;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::process::Command;

pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, delete_dst: bool) -> Result<()> {
use std::process::Stdio;
const RETRY_INTERVAL: Duration = Duration::from_secs(5);
const RETRY_COUNT: usize = 5;

let mut cmd = Command::new("azcopy");
#[derive(Clone, Copy)]
enum Mode {
Copy,
Sync,
}

impl fmt::Display for Mode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let as_str = match self {
Mode::Copy => "copy",
Mode::Sync => "sync",
};
write!(f, "{}", as_str)
}
}

async fn az_impl(mode: Mode, src: &OsStr, dst: &OsStr, args: &[&str]) -> Result<()> {
let mut cmd = Command::new("azcopy");
cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.arg("sync")
.arg(mode.to_string())
.arg(&src)
.arg(&dst);

if delete_dst {
cmd.arg("--delete-destination");
}
.arg(&dst)
.args(args);

let output = cmd
.spawn()
Expand All @@ -32,9 +50,10 @@ pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, delete_dst: bo
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!(
"sync failed src:{:?} dst:{:?} stdout:{:?} stderr:{:?}",
src.as_ref(),
dst.as_ref(),
"azcopy {} failed src:{:?} dst:{:?} stdout:{:?} stderr:{:?}",
mode,
src,
dst,
stdout,
stderr
);
Expand All @@ -43,40 +62,56 @@ pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, delete_dst: bo
Ok(())
}

pub async fn copy(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, recursive: bool) -> Result<()> {
use std::process::Stdio;
async fn retry_az_impl(mode: Mode, src: &OsStr, dst: &OsStr, args: &[&str]) -> Result<()> {
let counter = AtomicUsize::new(0);

let mut cmd = Command::new("azcopy");
let operation = || async {
let attempt_count = counter.fetch_add(1, Ordering::SeqCst);
let result = az_impl(mode, src, dst, args)
.await
.with_context(|| format!("azcopy {} attempt {} failed", mode, attempt_count + 1));
match result {
Ok(()) => Ok(()),
Err(x) => {
if attempt_count >= RETRY_COUNT {
Err(backoff::Error::Permanent(x))
} else {
Err(backoff::Error::Transient(x))
}
}
}
};

cmd.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.arg("copy")
.arg(&src)
.arg(&dst);
retry_notify(
ExponentialBackoff {
current_interval: RETRY_INTERVAL,
initial_interval: RETRY_INTERVAL,
..ExponentialBackoff::default()
},
operation,
|err, dur| warn!("request attempt failed after {:?}: {:?}", dur, err),
)
.await?;

if recursive {
cmd.arg("--recursive=true");
}
Ok(())
}

let output = cmd
.spawn()
.context("azcopy failed to start")?
.wait_with_output()
.await
.context("azcopy failed to run")?;
if !output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!(
"copy failed src:{:?} dst:{:?} stdout:{:?} stderr:{:?}",
src.as_ref(),
dst.as_ref(),
stdout,
stderr
);
}
pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, delete_dst: bool) -> Result<()> {
let args = if delete_dst {
vec!["--delete_destination"]
} else {
vec![]
};

Ok(())
retry_az_impl(Mode::Sync, src.as_ref(), dst.as_ref(), &args).await
}

pub async fn copy(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, recursive: bool) -> Result<()> {
let args = if recursive {
vec!["--recursive=true"]
} else {
vec![]
};

retry_az_impl(Mode::Copy, src.as_ref(), dst.as_ref(), &args).await
}
2 changes: 1 addition & 1 deletion src/agent/reqwest-retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ log = "0.4"
onefuzz-telemetry = { path = "../onefuzz-telemetry" }

[dev-dependencies]
tokio = { version = "0.2" , features=["macros"] }
tokio = { version = "0.2" , features=["macros"] }