Skip to content

Commit

Permalink
Harden retries on tenant/timeline deletion path. (#4973)
Browse files Browse the repository at this point in the history
Originated from test failure where we got SlowDown error from s3.
The patch generalizes `download_retry` to not be download specific.
Resulting `retry` function is moved to utils crate. `download_retries`
is now a thin wrapper around this `retry` function.

To ensure that all needed retries are in place test code now uses
`test_remote_failures=1` setting.

Ref https://neondb.slack.com/archives/C059ZC138NR/p1691743624353009
  • Loading branch information
LizardWizzard authored Aug 14, 2023
1 parent 49c57c0 commit 4626d89
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 155 deletions.
188 changes: 188 additions & 0 deletions libs/utils/src/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use std::fmt::{Debug, Display};

use futures::Future;

pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;

pub async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
tracing::info!(
"Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
);
tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
}
}

pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
} else {
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
}
}

/// retries passed operation until one of the following conditions are met:
/// Encountered error is considered as permanent (non-retryable)
/// Retries have been exhausted.
/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors
/// When attempts cross `warn_threshold` function starts to emit log warnings.
/// `description` argument is added to log messages. Its value should identify the `op` is doing
pub async fn retry<T, O, F, E>(
mut op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
) -> Result<T, E>
where
// Not std::error::Error because anyhow::Error doesnt implement it.
// For context see https://github.com/dtolnay/anyhow/issues/63
E: Display + Debug,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
let mut attempts = 0;
loop {
let result = op().await;
match result {
Ok(_) => {
if attempts > 0 {
tracing::info!("{description} succeeded after {attempts} retries");
}
return result;
}

// These are "permanent" errors that should not be retried.
Err(ref e) if is_permanent(e) => {
return result;
}
// Assume that any other failure might be transient, and the operation might
// succeed if we just keep trying.
Err(err) if attempts < warn_threshold => {
tracing::info!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(err) if attempts < max_retries => {
tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
}
Err(ref err) => {
// Operation failed `max_attempts` times. Time to give up.
tracing::warn!(
"{description} still failed after {attempts} retries, giving up: {err:?}"
);
return result;
}
}
// sleep and retry
exponential_backoff(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
)
.await;
attempts += 1;
}
}

#[cfg(test)]
mod tests {
use std::io;

use tokio::sync::Mutex;

use super::*;

#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;

for i in 0..10_000 {
let new_backoff_value = exponential_backoff_duration_seconds(
i,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);

if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
assert!(
old_backoff_value <= new_backoff_value,
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
)
}
}

assert_eq!(
current_backoff_value.expect("Should have produced backoff values to compare"),
DEFAULT_MAX_BACKOFF_SECONDS,
"Given big enough of retries, backoff should reach its allowed max value"
);
}

#[tokio::test(start_paused = true)]
async fn retry_always_error() {
let count = Mutex::new(0);
let err_result = retry(
|| async {
*count.lock().await += 1;
Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other))
},
|_e| false,
1,
1,
"work",
)
.await;

assert!(err_result.is_err());

assert_eq!(*count.lock().await, 2);
}

#[tokio::test(start_paused = true)]
async fn retry_ok_after_err() {
let count = Mutex::new(0);
retry(
|| async {
let mut locked = count.lock().await;
if *locked > 1 {
Ok(())
} else {
*locked += 1;
Err(io::Error::from(io::ErrorKind::Other))
}
},
|_e| false,
2,
2,
"work",
)
.await
.unwrap();
}

#[tokio::test(start_paused = true)]
async fn dont_retry_permanent_errors() {
let count = Mutex::new(0);
let _ = retry(
|| async {
let mut locked = count.lock().await;
if *locked > 1 {
Ok(())
} else {
*locked += 1;
Err(io::Error::from(io::ErrorKind::Other))
}
},
|_e| true,
2,
2,
"work",
)
.await
.unwrap_err();

assert_eq!(*count.lock().await, 1);
}
}
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! `utils` is intended to be a place to put code that is shared
//! between other crates in this repository.
pub mod backoff;

/// `Lsn` type implements common tasks on Log Sequence Numbers
pub mod lsn;
/// SeqWait allows waiting for a future sequence number to arrive
Expand Down
53 changes: 0 additions & 53 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,6 @@ pub async fn shutdown_pageserver(exit_code: i32) {
std::process::exit(exit_code);
}

const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;

async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
info!(
"Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
);
tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
}
}

pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
} else {
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
}
}

/// The name of the metadata file pageserver creates per timeline.
/// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
pub const METADATA_FILE_NAME: &str = "metadata";
Expand Down Expand Up @@ -238,37 +216,6 @@ async fn timed<Fut: std::future::Future>(
}
}

#[cfg(test)]
mod backoff_defaults_tests {
use super::*;

#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;

for i in 0..10_000 {
let new_backoff_value = exponential_backoff_duration_seconds(
i,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);

if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
assert!(
old_backoff_value <= new_backoff_value,
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
)
}
}

assert_eq!(
current_backoff_value.expect("Should have produced backoff values to compare"),
DEFAULT_MAX_BACKOFF_SECONDS,
"Given big enough of retries, backoff should reach its allowed max value"
);
}
}

#[cfg(test)]
mod timed_tests {
use super::timed;
Expand Down
79 changes: 46 additions & 33 deletions pageserver/src/tenant/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::sync::OwnedMutexGuard;
use tracing::{error, info, instrument, warn, Instrument, Span};

use utils::{
completion, crashsafe, fs_ext,
backoff, completion, crashsafe, fs_ext,
id::{TenantId, TimelineId},
};

Expand All @@ -23,12 +23,13 @@ use crate::{

use super::{
mgr::{GetTenantError, TenantsMap},
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
span,
timeline::delete::DeleteTimelineFlow,
tree_sort_timelines, DeleteTimelineError, Tenant,
};

const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u8 = 3;
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;

#[derive(Debug, thiserror::Error)]
pub enum DeleteTenantError {
Expand Down Expand Up @@ -71,10 +72,19 @@ async fn create_remote_delete_mark(
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id)?;

let data: &[u8] = &[];
remote_storage
.upload(data, 0, &remote_mark_path, None)
.await
.context("mark upload")?;
backoff::retry(
|| async {
remote_storage
.upload(data, 0, &remote_mark_path, None)
.await
},
|_e| false,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"mark_upload",
)
.await
.context("mark_upload")?;

Ok(())
}
Expand Down Expand Up @@ -154,9 +164,16 @@ async fn remove_tenant_remote_delete_mark(
tenant_id: &TenantId,
) -> Result<(), DeleteTenantError> {
if let Some(remote_storage) = remote_storage {
remote_storage
.delete(&remote_tenant_delete_mark_path(conf, tenant_id)?)
.await?;
let path = remote_tenant_delete_mark_path(conf, tenant_id)?;
backoff::retry(
|| async { remote_storage.delete(&path).await },
|_e| false,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
)
.await
.context("remove_tenant_remote_delete_mark")?;
}
Ok(())
}
Expand Down Expand Up @@ -337,32 +354,28 @@ impl DeleteTenantFlow {
return Ok(acquire(tenant));
}

let remote_storage = match remote_storage {
Some(remote_storage) => remote_storage,
None => return Ok(None),
};

// If remote storage is there we rely on it
if let Some(remote_storage) = remote_storage {
let remote_mark_path = remote_tenant_delete_mark_path(conf, &tenant_id)?;

let attempt = 1;
loop {
match remote_storage.download(&remote_mark_path).await {
Ok(_) => return Ok(acquire(tenant)),
Err(e) => {
if matches!(e, DownloadError::NotFound) {
return Ok(None);
}
if attempt > SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS {
return Err(anyhow::anyhow!(e))?;
}

warn!(
"failed to fetch tenant deletion mark at {} attempt {}",
&remote_mark_path, attempt
)
}
}
}
}
let remote_mark_path = remote_tenant_delete_mark_path(conf, &tenant_id)?;

let result = backoff::retry(
|| async { remote_storage.download(&remote_mark_path).await },
|e| matches!(e, DownloadError::NotFound),
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
"fetch_tenant_deletion_mark",
)
.await;

Ok(None)
match result {
Ok(_) => Ok(acquire(tenant)),
Err(DownloadError::NotFound) => Ok(None),
Err(e) => Err(anyhow::anyhow!(e)).context("should_resume_deletion")?,
}
}

pub(crate) async fn resume(
Expand Down
Loading

1 comment on commit 4626d89

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1656 tests run: 1574 passed, 0 failed, 82 skipped (full report)


The comment gets automatically updated with the latest test results
4626d89 at 2023-08-14T14:59:02.420Z :recycle:

Please sign in to comment.