Skip to content

Commit

Permalink
fix(workflows): add retry delay for txn errors (#1138)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->
Fixes RVTEE-597
## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Sep 28, 2024
1 parent 4b0be5f commit 614846b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
16 changes: 9 additions & 7 deletions lib/chirp-workflow/core/src/db/pg_nats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use indoc::indoc;
use rivet_pools::prelude::NatsPool;
Expand All @@ -22,6 +22,8 @@ use crate::{
const MAX_PULLED_WORKFLOWS: i64 = 50;
// Base retry for query retry backoff
const QUERY_RETRY_MS: usize = 750;
// Time in between transaction retries
const TXN_RETRY: Duration = Duration::from_millis(100);
/// Maximum times a query ran bu this database adapter is retried.
const MAX_QUERY_RETRIES: usize = 16;

Expand Down Expand Up @@ -89,16 +91,16 @@ impl DatabasePgNats {
use sqlx::Error::*;
match &err {
// Retry transaction errors immediately
Database(db_err) => {
Database(db_err)
if db_err
.message()
.contains("TransactionRetryWithProtoRefreshError")
{
tracing::warn!(message=%db_err.message(), "transaction retry");
}
.contains("TransactionRetryWithProtoRefreshError") =>
{
tracing::info!(message=%db_err.message(), "transaction retry");
tokio::time::sleep(TXN_RETRY).await;
}
// Retry internal errors with a backoff
Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
| WorkerCrashed => {
tracing::warn!(?err, "query retry");
backoff.tick().await;
Expand Down
9 changes: 5 additions & 4 deletions lib/chirp-workflow/core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ impl Registry {
}

pub struct RegistryWorkflow {
pub run:
for<'a> fn(
&'a mut WorkflowCtx,
) -> Pin<Box<dyn Future<Output = WorkflowResult<serde_json::Value>> + Send + 'a>>,
pub run: for<'a> fn(
&'a mut WorkflowCtx,
) -> Pin<
Box<dyn Future<Output = WorkflowResult<serde_json::Value>> + Send + 'a>,
>,
}

0 comments on commit 614846b

Please sign in to comment.