Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: retry compute wake in auth #4817

Merged
merged 3 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion proxy/src/auth/backend/classic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::ops::ControlFlow;

use super::AuthSuccess;
use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute,
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
proxy::{try_wake, NUM_RETRIES_CONNECT},
sasl, scram,
stream::PqStream,
};
Expand Down Expand Up @@ -48,7 +51,15 @@ pub(super) async fn authenticate(
}
};

let mut node = api.wake_compute(extra, creds).await?;
let mut num_retries = 0;
let mut node = loop {
num_retries += 1;
match try_wake(api, extra, creds).await? {
ControlFlow::Break(n) => break n,
ControlFlow::Continue(_) if num_retries < NUM_RETRIES_CONNECT => continue,
ControlFlow::Continue(e) => return Err(e.into()),
}
};
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
if let Some(keys) = scram_keys {
use tokio_postgres::config::AuthKeys;
node.config.auth_keys(AuthKeys::ScramSha256(keys));
Expand Down
19 changes: 19 additions & 0 deletions proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod errors {
use crate::{
error::{io_error, UserFacingError},
http,
proxy::ShouldRetry,
};
use thiserror::Error;

Expand Down Expand Up @@ -72,6 +73,24 @@ pub mod errors {
}
}

impl ShouldRetry for ApiError {
fn could_retry(&self) -> bool {
match self {
// retry some transport errors
Self::Transport(io) => io.could_retry(),
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
Self::Console {
status: http::StatusCode::BAD_REQUEST | http::StatusCode::LOCKED,
..
} => true,
// retry server errors
Self::Console { status, .. } if status.is_server_error() => true,
_ => false,
}
}
}

impl From<reqwest::Error> for ApiError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
Expand Down
63 changes: 31 additions & 32 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@ use crate::{
cancellation::{self, CancelMap},
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{
self,
errors::{ApiError, WakeComputeError},
messages::MetricsAuxInfo,
},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo},
stream::{PqStream, Stream},
};
use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::TryFutureExt;
use hyper::StatusCode;
use metrics::{
exponential_buckets, register_histogram, register_int_counter_vec, Histogram, IntCounterVec,
};
Expand All @@ -33,7 +28,7 @@ use utils::measured_stream::MeasuredStream;

/// Number of times we should retry the `/proxy_wake_compute` http request.
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
const NUM_RETRIES_CONNECT: u32 = 10;
pub const NUM_RETRIES_CONNECT: u32 = 10;
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);

Expand Down Expand Up @@ -413,13 +408,22 @@ where
loop {
match state {
ConnectionState::Invalid(config, err) => {
match try_wake(&config, extra, creds).await {
// we can't wake up the compute node
Ok(None) => return Err(err.into()),
let wake_res = match creds {
auth::BackendType::Console(api, creds) => {
try_wake(api.as_ref(), extra, creds).await
}
auth::BackendType::Postgres(api, creds) => {
try_wake(api.as_ref(), extra, creds).await
}
// nothing to do?
auth::BackendType::Link(_) => return Err(err.into()),
};

match wake_res {
// there was an error communicating with the control plane
Err(e) => return Err(e.into()),
// failed to wake up but we can continue to retry
Ok(Some(ControlFlow::Continue(()))) => {
Ok(ControlFlow::Continue(_)) => {
state = ConnectionState::Invalid(config, err);
let wait_duration = retry_after(num_retries);
num_retries += 1;
Expand All @@ -429,7 +433,8 @@ where
continue;
}
// successfully woke up a compute node and can break the wakeup loop
Ok(Some(ControlFlow::Break(mut node_info))) => {
Ok(ControlFlow::Break(mut node_info)) => {
node_info.config.reuse_password(&config);
mechanism.update_connect_config(&mut node_info.config);
state = ConnectionState::Cached(node_info)
}
Expand Down Expand Up @@ -465,28 +470,22 @@ where
}

/// Attempts to wake up the compute node.
/// * Returns Ok(Some(true)) if there was an error waking but retries are acceptable
/// * Returns Ok(Some(false)) if the wakeup succeeded
/// * Returns Ok(None) or Err(e) if there was an error
async fn try_wake(
config: &compute::ConnCfg,
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
/// * Returns Ok(Break(node)) if the wakeup succeeded
/// * Returns Err(e) if there was an error
pub async fn try_wake(
api: &impl console::Api,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<Option<ControlFlow<console::CachedNodeInfo>>, WakeComputeError> {
creds: &auth::ClientCredentials<'_>,
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
info!("compute node's state has likely changed; requesting a wake-up");
match creds.wake_compute(extra).await {
// retry wake if the compute was in an invalid state
Err(WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
})) => Ok(Some(ControlFlow::Continue(()))),
// Update `node_info` and try again.
Ok(Some(mut new)) => {
new.config.reuse_password(config);
Ok(Some(ControlFlow::Break(new)))
}
Err(e) => Err(e),
Ok(None) => Ok(None),
match api.wake_compute(extra, creds).await {
Err(err) => match &err {
WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)),
_ => Err(err),
},
// Ready to try again.
Ok(new) => Ok(ControlFlow::Break(new)),
}
}

Expand Down