Skip to content

Commit

Permalink
fix(pageserver): preempt and retry azure list operation
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh committed Nov 21, 2024
1 parent 725a5ff commit 4c8670e
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions libs/remote_storage/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerCl
use bytes::Bytes;
use futures::future::Either;
use futures::stream::Stream;
use futures::FutureExt;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http_types::{StatusCode, Url};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use utils::backoff;
use utils::backoff::exponential_backoff_duration_seconds;

use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
use crate::{
Expand Down Expand Up @@ -302,40 +304,59 @@ impl RemoteStorage for AzureBlobStorage {

let mut next_marker = None;

let mut timeout_try_cnt = 1;

'outer: loop {
let mut builder = builder.clone();
if let Some(marker) = next_marker.clone() {
builder = builder.marker(marker);
}
let response = builder.into_stream();
let response = response.into_stream().map_err(to_download_error);
let response = tokio_stream::StreamExt::timeout(response, self.timeout);
let response = response.map(|res| match res {
Ok(res) => res,
Err(_elapsed) => Err(DownloadError::Timeout),
// Azure Blob Rust SDK does not expose the list blob API directly. Users have to use
// their pageable iterator wrapper that returns all keys as a stream. We want to have
// full control of paging, and therefore we only take the first item from the stream.
let mut response_stream = builder.into_stream();
let response = response_stream.next();
// Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request
// would immediately succeed. Therefore, we use exponential backoff timeout to retry the request.
// (Usually, exponential backoff is used to determine the sleep time between two retries.) We
// start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures.
// timeout = min(5 * (1.0+1.0)^n, self.timeout).
let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64());
let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response);
let response = response.map(|res| {
match res {
Ok(Some(Ok(res))) => Ok(Some(res)),
Ok(Some(Err(e))) => Err(to_download_error(e)),
Ok(None) => Ok(None),
Err(_elasped) => Err(DownloadError::Timeout),
}
});

let mut response = std::pin::pin!(response);

let mut max_keys = max_keys.map(|mk| mk.get());
let next_item = tokio::select! {
op = response.next() => Ok(op),
op = response => op,
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}?;
};

if let Err(DownloadError::Timeout) = &next_item {
timeout_try_cnt += 1;
if timeout_try_cnt <= 5 {
continue;
}
}

let next_item = next_item?;

if timeout_try_cnt >= 2 {
tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt);
}
timeout_try_cnt = 1;

let Some(entry) = next_item else {
// The list is complete, so yield it.
break;
};

let mut res = Listing::default();
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
// The error is potentially retryable, so we must rewind the loop after yielding.
yield Err(e);
continue;
}
};
next_marker = entry.continuation();
let prefix_iter = entry
.blobs
Expand All @@ -351,7 +372,7 @@ impl RemoteStorage for AzureBlobStorage {
last_modified: k.properties.last_modified.into(),
size: k.properties.content_length,
}
);
);

for key in blob_iter {
res.keys.push(key);
Expand Down

0 comments on commit 4c8670e

Please sign in to comment.