Replies: 1 comment
-
In case it helps anyone I came up with this, which seems to work (not tested in prod). One concern is how heavy the pool is to clone, does anyone know if that's safe to clone a bunch? #[async_trait]
trait Transactable {
type T: Send;
async fn run(&mut self, txn: &mut PgConnection) -> Result<Self::T, sqlx::Error>;
fn db(&mut self) -> Pool<Postgres>;
async fn execute(&mut self) -> Result<Self::T, sqlx::Error> {
let mut txn = self.db().begin().await?;
sqlx::query("SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
let mut retries = 0;
loop {
match self.run(&mut *txn).await {
Ok(result) => {
sqlx::query("RELEASE SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
return Ok(result);
}
Err(err)
if retries <= 5
&& err.as_database_error().map_or(false, |e| {
e.code().map(|s| s.to_string()) == Some("40001".into())
}) =>
{
sqlx::query("ROLLBACK TO SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
retries += 1;
}
Err(err) => {
sqlx::query("ROLLBACK TO SAVEPOINT cockroach_restart")
.execute(&mut *txn)
.await?;
return Err(err);
}
}
}
}
}
struct SaveThing {
db: Pool<Postgres>,
}
#[async_trait]
impl Transactable for SaveThing {
type T = Thing;
fn db(&mut self) -> Pool<Postgres> {
self.db.clone()
}
async fn run(&mut self, txn: &mut PgConnection) -> Result<Thing, sqlx::Error> {
let thing = sqlx::query_as!(
Thing,
"
update thing set
title = 'new title',
returning *
"
)
.fetch_one(&mut *txn)
.await;
thing
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I'm wondering how I can implement a wrapper/enhance sqlx to allow for generic retries on an executed transaction as described here: https://www.cockroachlabs.com/docs/v23.1/transactions#transaction-retries
I see an example here: https://www.cockroachlabs.com/docs/stable/build-a-rust-app-with-cockroachdb but I'm struggling to translate this sqlx. I tried building my own Executor impl that wraps, and while this all compiles I'm not sure where to inject the retry logic in the function
fetch_many
or as it returns aBoxStream
.I also attempted to build a generic
txn
function that could take adb
and an async block that yielded atx
executor to a closure so multiple statements could be executed with retries but also struggled to build that.Something like this:
Any hints on how to build a generic wrapper around a transaction that can retry on specific return codes from the db?
Beta Was this translation helpful? Give feedback.
All reactions