Skip to content

Commit

Permalink
rename to monitoring and remove public fields
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jan 4, 2024
1 parent d2cdc3e commit 2146bcf
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 130 deletions.
19 changes: 9 additions & 10 deletions proxy/src/auth/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::auth::credentials::check_peer_addr_is_in_list;
use crate::auth::validate_password_and_exchange;
use crate::console::errors::GetAuthInfoError;
use crate::console::AuthSecret;
use crate::context::RequestContext;
use crate::context::RequestMonitoring;
use crate::proxy::connect_compute::handle_try_wake;
use crate::proxy::retry::retry_after;
use crate::scram;
Expand Down Expand Up @@ -163,7 +163,7 @@ impl TryFrom<ClientCredentials> for ComputeUserInfo {
///
/// All authentication flows will emit an AuthenticationOk message if successful.
async fn auth_quirks(
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
api: &impl console::Api,
creds: ClientCredentials,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
Expand All @@ -177,7 +177,7 @@ async fn auth_quirks(
Err(info) => {
let res = hacks::password_hack_no_authentication(info, client, &mut ctx.latency_timer)
.await?;
ctx.endpoint_id = Some(res.info.endpoint.clone());
ctx.set_endpoint_id(Some(res.info.endpoint.clone()));
(res.info, Some(res.keys))
}
Ok(info) => (info, None),
Expand Down Expand Up @@ -222,7 +222,7 @@ async fn auth_quirks(
}

async fn authenticate_with_secret(
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
secret: AuthSecret,
info: ComputeUserInfo,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
Expand Down Expand Up @@ -261,7 +261,7 @@ async fn authenticate_with_secret(
/// Authenticate the user and then wake a compute (or retrieve an existing compute session from cache)
/// only if authentication was successfuly.
async fn auth_and_wake_compute(
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
api: &impl console::Api,
extra: &ConsoleReqExtra,
creds: ClientCredentials,
Expand Down Expand Up @@ -292,8 +292,7 @@ async fn auth_and_wake_compute(
tokio::time::sleep(wait_duration).await;
};

ctx.branch = Some(node.aux.branch_id.clone());
ctx.project = Some(node.aux.project_id.clone());
ctx.set_project(node.aux.clone());

match compute_credentials.keys {
#[cfg(feature = "testing")]
Expand Down Expand Up @@ -337,7 +336,7 @@ impl<'a> BackendType<'a, ClientCredentials> {
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
pub async fn authenticate(
self,
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
extra: &ConsoleReqExtra,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
allow_cleartext: bool,
Expand Down Expand Up @@ -410,7 +409,7 @@ impl<'a> BackendType<'a, ClientCredentials> {
impl BackendType<'_, ComputeUserInfo> {
pub async fn get_allowed_ips(
&self,
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
use BackendType::*;
match self {
Expand All @@ -427,7 +426,7 @@ impl BackendType<'_, ComputeUserInfo> {
/// The link auth flow doesn't support this, so we return [`None`] in that case.
pub async fn wake_compute(
&self,
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
extra: &ConsoleReqExtra,
) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> {
use BackendType::*;
Expand Down
51 changes: 36 additions & 15 deletions proxy/src/auth/credentials.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! User credentials used in authentication.
use crate::{
auth::password_hack::parse_endpoint_param, error::UserFacingError,
auth::password_hack::parse_endpoint_param, context::RequestMonitoring, error::UserFacingError,
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::neon_options_str,
};
use itertools::Itertools;
Expand Down Expand Up @@ -55,6 +55,7 @@ impl ClientCredentials {

impl ClientCredentials {
pub fn parse(
ctx: &mut RequestMonitoring,
params: &StartupMessageParams,
sni: Option<&str>,
common_names: Option<HashSet<String>>,
Expand All @@ -63,7 +64,12 @@ impl ClientCredentials {

// Some parameters are stored in the startup message.
let get_param = |key| params.get(key).ok_or(MissingKey(key));
let user = get_param("user")?.into();
let user: SmolStr = get_param("user")?.into();

// record the values if we have them
ctx.set_application(params.get("application_name").map(SmolStr::from));
ctx.set_user(user.clone());
ctx.set_endpoint_id(sni.map(SmolStr::from));

// Project name might be passed via PG's command-line options.
let project_option = params
Expand Down Expand Up @@ -216,7 +222,8 @@ mod tests {
fn parse_bare_minimum() -> anyhow::Result<()> {
// According to postgresql, only `user` should be required.
let options = StartupMessageParams::new([("user", "john_doe")]);
let creds = ClientCredentials::parse(&options, None, None)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project, None);

Expand All @@ -230,7 +237,8 @@ mod tests {
("database", "world"), // should be ignored
("foo", "bar"), // should be ignored
]);
let creds = ClientCredentials::parse(&options, None, None)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project, None);

Expand All @@ -244,7 +252,8 @@ mod tests {
let sni = Some("foo.localhost");
let common_names = Some(["localhost".into()].into());

let creds = ClientCredentials::parse(&options, sni, common_names)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("foo"));
assert_eq!(creds.cache_key, "foo");
Expand All @@ -259,7 +268,8 @@ mod tests {
("options", "-ckey=1 project=bar -c geqo=off"),
]);

let creds = ClientCredentials::parse(&options, None, None)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("bar"));

Expand All @@ -273,7 +283,8 @@ mod tests {
("options", "-ckey=1 endpoint=bar -c geqo=off"),
]);

let creds = ClientCredentials::parse(&options, None, None)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("bar"));

Expand All @@ -290,7 +301,8 @@ mod tests {
),
]);

let creds = ClientCredentials::parse(&options, None, None)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert!(creds.project.is_none());

Expand All @@ -304,7 +316,8 @@ mod tests {
("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"),
]);

let creds = ClientCredentials::parse(&options, None, None)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert!(creds.project.is_none());

Expand All @@ -318,7 +331,8 @@ mod tests {
let sni = Some("baz.localhost");
let common_names = Some(["localhost".into()].into());

let creds = ClientCredentials::parse(&options, sni, common_names)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("baz"));

Expand All @@ -331,12 +345,14 @@ mod tests {

let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.a.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));

let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.b.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));

Ok(())
Expand All @@ -350,7 +366,9 @@ mod tests {
let sni = Some("second.localhost");
let common_names = Some(["localhost".into()].into());

let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
let mut ctx = RequestMonitoring::test();
let err = ClientCredentials::parse(&mut ctx, &options, sni, common_names)
.expect_err("should fail");
match err {
InconsistentProjectNames { domain, option } => {
assert_eq!(option, "first");
Expand All @@ -367,7 +385,9 @@ mod tests {
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());

let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
let mut ctx = RequestMonitoring::test();
let err = ClientCredentials::parse(&mut ctx, &options, sni, common_names)
.expect_err("should fail");
match err {
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
Expand All @@ -385,7 +405,8 @@ mod tests {

let sni = Some("project.localhost");
let common_names = Some(["localhost".into()].into());
let creds = ClientCredentials::parse(&options, sni, common_names)?;
let mut ctx = RequestMonitoring::test();
let creds = ClientCredentials::parse(&mut ctx, &options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("project"));
assert_eq!(creds.cache_key, "projectendpoint_type:read_write lsn:0/2");

Expand Down
7 changes: 4 additions & 3 deletions proxy/src/bin/pg_sni_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{net::SocketAddr, sync::Arc};
use futures::future::Either;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::context::RequestContext;
use proxy::context::RequestMonitoring;
use proxy::proxy::run_until_cancelled;
use tokio::net::TcpListener;

Expand Down Expand Up @@ -171,7 +171,8 @@ async fn task_main(
.context("failed to set socket option")?;

info!(%peer_addr, "serving");
let mut ctx = RequestContext::new(session_id, peer_addr.ip(), "sni_router", "sni");
let mut ctx =
RequestMonitoring::new(session_id, peer_addr.ip(), "sni_router", "sni");
handle_client(
&mut ctx,
dest_suffix,
Expand Down Expand Up @@ -245,7 +246,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
}

async fn handle_client(
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint,
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError,
context::RequestContext, error::UserFacingError, metrics::NUM_DB_CONNECTIONS_GAUGE,
context::RequestMonitoring, error::UserFacingError, metrics::NUM_DB_CONNECTIONS_GAUGE,
proxy::neon_option,
};
use futures::{FutureExt, TryFutureExt};
Expand Down Expand Up @@ -233,7 +233,7 @@ impl ConnCfg {
/// Connect to a corresponding compute node.
pub async fn connect(
&self,
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
allow_self_signed_compute: bool,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
Expand Down
8 changes: 4 additions & 4 deletions proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
auth::backend::ComputeUserInfo,
cache::{timed_lru, TimedLru},
compute,
context::RequestContext,
context::RequestMonitoring,
scram,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -261,20 +261,20 @@ pub trait Api {
/// Get the client's auth secret for authentication.
async fn get_role_secret(
&self,
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
creds: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;

async fn get_allowed_ips(
&self,
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
creds: &ComputeUserInfo,
) -> Result<Arc<Vec<String>>, errors::GetAuthInfoError>;

/// Wake up the compute node and return the corresponding connection info.
async fn wake_compute(
&self,
ctx: &mut RequestContext,
ctx: &mut RequestMonitoring,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
Expand Down
8 changes: 4 additions & 4 deletions proxy/src/console/provider/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use crate::{console::provider::CachedRoleSecret, context::RequestContext};
use crate::{console::provider::CachedRoleSecret, context::RequestMonitoring};
use async_trait::async_trait;
use futures::TryFutureExt;
use thiserror::Error;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_role_secret(
&self,
_ctx: &mut RequestContext,
_ctx: &mut RequestMonitoring,
creds: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
Ok(CachedRoleSecret::new_uncached(
Expand All @@ -155,7 +155,7 @@ impl super::Api for Api {

async fn get_allowed_ips(
&self,
_ctx: &mut RequestContext,
_ctx: &mut RequestMonitoring,
creds: &ComputeUserInfo,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips))
Expand All @@ -164,7 +164,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
_ctx: &mut RequestContext,
_ctx: &mut RequestMonitoring,
_extra: &ConsoleReqExtra,
_creds: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
Expand Down
Loading

0 comments on commit 2146bcf

Please sign in to comment.