Skip to content

Commit

Permalink
fix: add transacitons (#689)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Apr 18, 2024
1 parent 8086559 commit f55b7e6
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
20 changes: 20 additions & 0 deletions lib/pools/src/utils/crdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ where
bail!("transaction failed with retry too many times");
}

/// Runs a transaction without retrying.
#[tracing::instrument(skip_all)]
pub async fn tx_no_retry<T, F>(crdb: &CrdbPool, f: F) -> GlobalResult<T>
where
for<'t> F: Fn(&'t mut sqlx::Transaction<'_, sqlx::Postgres>) -> AsyncResult<'t, T>,
{
let mut tx = crdb.begin().await?;

match f(&mut tx).await {
Err(err) => {
tx.rollback().await?;
Err(err)
}
Ok(x) => {
tx.commit().await?;
Ok(x)
}
}
}

// TODO: This seems to leak connections on retries, even though it matches the
// CRDB spec. This is likely because of odd behavior in the sqlx driver.
///// Runs a transaction. This explicitly handles retry errors.
Expand Down
27 changes: 24 additions & 3 deletions svc/pkg/cluster/worker/src/workers/server_drain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chirp_worker::prelude::*;
use futures_util::FutureExt;
use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
Expand All @@ -14,24 +15,44 @@ struct Server {
datacenter_id: Uuid,
pool_type: i64,
nomad_node_id: Option<String>,
is_not_draining: bool,
}

#[worker(name = "cluster-server-drain")]
async fn worker(ctx: &OperationContext<cluster::msg::server_drain::Message>) -> GlobalResult<()> {
rivet_pools::utils::crdb::tx_no_retry(&ctx.crdb().await?, |tx| inner(ctx.clone(), tx).boxed())
.await?;

Ok(())
}

async fn inner(
ctx: OperationContext<cluster::msg::server_drain::Message>,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> GlobalResult<()> {
let server_id = unwrap_ref!(ctx.server_id).as_uuid();

let server = sql_fetch_one!(
[ctx, Server]
[ctx, Server, @tx tx]
"
SELECT
datacenter_id, pool_type, nomad_node_id
datacenter_id,
pool_type,
nomad_node_id,
(drain_ts IS NULL) AS is_not_draining
FROM db_cluster.servers
WHERE server_id = $1
FOR UPDATE
",
server_id,
)
.await?;

if server.is_not_draining {
tracing::error!("attempting to drain server that was not set as draining");
return Ok(());
}

// Fetch datacenter config
let datacenter_res = op!([ctx] cluster_datacenter_get {
datacenter_ids: vec![server.datacenter_id.into()],
Expand Down Expand Up @@ -95,7 +116,7 @@ async fn worker(ctx: &OperationContext<cluster::msg::server_drain::Message>) ->
backend::cluster::PoolType::Gg => {
// Delete DNS record
msg!([ctx] cluster::msg::server_dns_delete(server_id) {
server_id: ctx.server_id,
server_id: Some(server_id.into()),
})
.await?;
}
Expand Down
29 changes: 25 additions & 4 deletions svc/pkg/cluster/worker/src/workers/server_undrain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chirp_worker::prelude::*;
use futures_util::FutureExt;
use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
Expand All @@ -14,25 +15,45 @@ struct Server {
datacenter_id: Uuid,
pool_type: i64,
nomad_node_id: Option<String>,
is_draining: bool,
}

#[worker(name = "cluster-server-undrain")]
async fn worker(ctx: &OperationContext<cluster::msg::server_undrain::Message>) -> GlobalResult<()> {
rivet_pools::utils::crdb::tx_no_retry(&ctx.crdb().await?, |tx| inner(ctx.clone(), tx).boxed())
.await?;

Ok(())
}

async fn inner(
ctx: OperationContext<cluster::msg::server_undrain::Message>,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> GlobalResult<()> {
let server_id = unwrap_ref!(ctx.server_id).as_uuid();

// NOTE: `drain_ts` will already be set to null before this worker is called
let server = sql_fetch_one!(
[ctx, Server]
[ctx, Server, @tx tx]
"
SELECT
datacenter_id, pool_type, nomad_node_id
datacenter_id,
pool_type,
nomad_node_id,
(drain_ts IS NOT NULL) AS is_draining
FROM db_cluster.servers
WHERE server_id = $1
FOR UPDATE
",
server_id
server_id,
)
.await?;

if server.is_draining {
tracing::error!("attempting to undrain server that was not set as undraining");
return Ok(());
}

let pool_type = unwrap!(backend::cluster::PoolType::from_i32(
server.pool_type as i32
));
Expand Down Expand Up @@ -77,7 +98,7 @@ async fn worker(ctx: &OperationContext<cluster::msg::server_undrain::Message>) -
backend::cluster::PoolType::Gg => {
// Recreate DNS record
msg!([ctx] cluster::msg::server_dns_create(server_id) {
server_id: ctx.server_id,
server_id: Some(server_id.into()),
})
.await?;
}
Expand Down

0 comments on commit f55b7e6

Please sign in to comment.