Skip to content

Commit

Permalink
proxy: Present new auth backend cplane_proxy_v1 (#10012)
Browse files Browse the repository at this point in the history
Implement a new auth backend based on the current Neon backend to switch
to the new Proxy V1 cplane API.

Implements [#21048](neondatabase/cloud#21048)
  • Loading branch information
awarus committed Dec 5, 2024
1 parent a83bd4e commit 0fd2115
Show file tree
Hide file tree
Showing 6 changed files with 634 additions and 2 deletions.
4 changes: 4 additions & 0 deletions proxy/src/auth/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl std::fmt::Display for Backend<'_, ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ControlPlane(api, ()) => match &**api {
ControlPlaneClient::ProxyV1(endpoint) => fmt
.debug_tuple("ControlPlane::ProxyV1")
.field(&endpoint.url())
.finish(),
ControlPlaneClient::Neon(endpoint) => fmt
.debug_tuple("ControlPlane::Neon")
.field(&endpoint.url())
Expand Down
99 changes: 98 additions & 1 deletion proxy/src/bin/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ enum AuthBackendType {
#[value(name("console"), alias("cplane"))]
ControlPlane,

#[value(name("cplane-v1"), alias("control-plane"))]
ControlPlaneV1,

#[value(name("link"), alias("control-redirect"))]
ConsoleRedirect,

Expand Down Expand Up @@ -518,6 +521,39 @@ async fn main() -> anyhow::Result<()> {
.instrument(span),
);
}
} else if let proxy::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(con, cancellation_token.clone()).await }
.instrument(span),
);
}
}
}

Expand Down Expand Up @@ -662,6 +698,65 @@ fn build_auth_backend(
args: &ProxyCliArgs,
) -> anyhow::Result<Either<&'static auth::Backend<'static, ()>, &'static ConsoleRedirectBackend>> {
match &args.auth_backend {
AuthBackendType::ControlPlaneV1 => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
args.project_info_cache.parse()?;
let endpoint_cache_config: config::EndpointCacheConfig =
args.endpoint_cache_config.parse()?;

info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}");
info!(
"Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}"
);
info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}");
let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new(
wake_compute_cache_config,
project_info_cache_config,
endpoint_cache_config,
)));

let config::ConcurrencyLockOptions {
shards,
limiter,
epoch,
timeout,
} = args.wake_compute_lock.parse()?;
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new(
"wake_compute_lock",
limiter,
shards,
timeout,
epoch,
&Metrics::get().wake_compute_lock,
)?));
tokio::spawn(locks.garbage_collect_worker());

let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;

let endpoint = http::Endpoint::new(url, http::new_client());

let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));

let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
);

let api = control_plane::client::ControlPlaneClient::ProxyV1(api);
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
let config = Box::leak(Box::new(auth_backend));

Ok(Either::Left(config))
}

AuthBackendType::ControlPlane => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
Expand Down Expand Up @@ -697,13 +792,15 @@ fn build_auth_backend(
)?));
tokio::spawn(locks.garbage_collect_worker());

let url = args.auth_endpoint.parse()?;
let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;

let endpoint = http::Endpoint::new(url, http::new_client());

let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));

let api = control_plane::client::neon::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
Expand Down
Loading

0 comments on commit 0fd2115

Please sign in to comment.