Skip to content

Commit

Permalink
feat(workflows): add nats worker wake
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Aug 9, 2024
1 parent 7b399f4 commit 965f10e
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 52 deletions.
8 changes: 5 additions & 3 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
workflow::SUB_WORKFLOW_RETRY,
OperationCtx,
},
db::{DatabaseHandle, DatabasePostgres},
db::{DatabaseHandle, DatabasePgNats},
error::WorkflowError,
message::Message,
operation::{Operation, OperationInput},
Expand Down Expand Up @@ -261,13 +261,15 @@ async fn db_from_ctx<B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
) -> GlobalResult<DatabaseHandle> {
let crdb = ctx.crdb().await?;
let nats = ctx.conn().nats().await?;

Ok(DatabasePostgres::from_pool(crdb))
Ok(DatabasePgNats::from_pools(crdb, nats))
}

// Get crdb pool as a trait object
pub async fn db_from_pools(pools: &rivet_pools::Pools) -> GlobalResult<DatabaseHandle> {
let crdb = pools.crdb()?;
let nats = pools.nats()?;

Ok(DatabasePostgres::from_pool(crdb))
Ok(DatabasePgNats::from_pools(crdb, nats))
}
5 changes: 3 additions & 2 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
workflow::SUB_WORKFLOW_RETRY,
MessageCtx, OperationCtx,
},
db::{DatabaseHandle, DatabasePostgres},
db::{DatabaseHandle, DatabasePgNats},
error::WorkflowError,
message::{Message, ReceivedMessage},
operation::{Operation, OperationInput},
Expand Down Expand Up @@ -70,7 +70,8 @@ impl TestCtx {
(),
);

let db = DatabasePostgres::from_pool(pools.crdb().unwrap());
let db =
DatabasePgNats::from_pools(pools.crdb().unwrap(), pools.nats_option().clone().unwrap());
let msg_ctx = MessageCtx::new(&conn, req_id, ray_id).await.unwrap();

TestCtx {
Expand Down
9 changes: 6 additions & 3 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,10 @@ impl WorkflowCtx {

/// Tests if the given error is unrecoverable. If it is, allows the user to run recovery code safely.
/// Should always be used when trying to handle activity errors manually.
pub fn catch_unrecoverable<T>(&mut self, res: GlobalResult<T>) -> GlobalResult<GlobalResult<T>> {
pub fn catch_unrecoverable<T>(
&mut self,
res: GlobalResult<T>,
) -> GlobalResult<GlobalResult<T>> {
match res {
Err(err) if !err.is_workflow_recoverable() => {
self.location_idx += 1;
Expand Down Expand Up @@ -1082,7 +1085,7 @@ impl WorkflowCtx {
let location = self.full_location();

let (msg, write) = tokio::join!(
self.db.publish_message_from_workflow(
self.db.commit_workflow_message_send_event(
self.workflow_id,
location.as_ref(),
&tags,
Expand Down Expand Up @@ -1143,7 +1146,7 @@ impl WorkflowCtx {
let location = self.full_location();

let (msg, write) = tokio::join!(
self.db.publish_message_from_workflow(
self.db.commit_workflow_message_send_event(
self.workflow_id,
location.as_ref(),
&tags,
Expand Down
32 changes: 26 additions & 6 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use crate::{
workflow::Workflow,
};

mod postgres;
pub use postgres::DatabasePostgres;
mod pg_nats;
pub use pg_nats::DatabasePgNats;

pub type DatabaseHandle = Arc<dyn Database + Sync>;

#[async_trait::async_trait]
pub trait Database: Send {
/// Writes a new workflow to the database.
async fn dispatch_workflow(
&self,
ray_id: Uuid,
Expand All @@ -26,19 +27,22 @@ pub trait Database: Send {
input: serde_json::Value,
) -> WorkflowResult<()>;
async fn get_workflow(&self, id: Uuid) -> WorkflowResult<Option<WorkflowRow>>;

/// Pulls workflows for processing by the worker. Will only pull workflows with names matching the filter.
async fn pull_workflows(
&self,
worker_instance_id: Uuid,
filter: &[&str],
) -> WorkflowResult<Vec<PulledWorkflow>>;

// When a workflow is completed
/// Mark a workflow as completed.
async fn commit_workflow(
&self,
workflow_id: Uuid,
output: &serde_json::Value,
) -> WorkflowResult<()>;
// When a workflow fails

/// Write a workflow failure to the database.
async fn fail_workflow(
&self,
workflow_id: Uuid,
Expand All @@ -48,12 +52,15 @@ pub trait Database: Send {
wake_sub_workflow: Option<Uuid>,
error: &str,
) -> WorkflowResult<()>;

/// Updates workflow tags.
async fn update_workflow_tags(
&self,
workflow_id: Uuid,
tags: &serde_json::Value,
) -> WorkflowResult<()>;

/// Write a workflow activity event to history.
async fn commit_workflow_activity_event(
&self,
workflow_id: Uuid,
Expand All @@ -65,13 +72,16 @@ pub trait Database: Send {
loop_location: Option<&[usize]>,
) -> WorkflowResult<()>;

/// Pulls the oldest signal with the given filter. Pulls from regular and tagged signals.
async fn pull_next_signal(
&self,
workflow_id: Uuid,
filter: &[&str],
location: &[usize],
loop_location: Option<&[usize]>,
) -> WorkflowResult<Option<SignalRow>>;

/// Write a new signal to the database.
async fn publish_signal(
&self,
ray_id: Uuid,
Expand All @@ -80,6 +90,8 @@ pub trait Database: Send {
signal_name: &str,
body: serde_json::Value,
) -> WorkflowResult<()>;

/// Write a new tagged signal to the database.
async fn publish_tagged_signal(
&self,
ray_id: Uuid,
Expand All @@ -88,6 +100,8 @@ pub trait Database: Send {
signal_name: &str,
body: serde_json::Value,
) -> WorkflowResult<()>;

/// Write a new signal to the database. Contains extra info used to populate the history.
async fn publish_signal_from_workflow(
&self,
from_workflow_id: Uuid,
Expand All @@ -99,6 +113,8 @@ pub trait Database: Send {
body: serde_json::Value,
loop_location: Option<&[usize]>,
) -> WorkflowResult<()>;

/// Write a new tagged signal to the database. Contains extra info used to populate the history.
async fn publish_tagged_signal_from_workflow(
&self,
from_workflow_id: Uuid,
Expand All @@ -111,6 +127,7 @@ pub trait Database: Send {
loop_location: Option<&[usize]>,
) -> WorkflowResult<()>;

/// Publish a new workflow from an existing workflow.
async fn dispatch_sub_workflow(
&self,
ray_id: Uuid,
Expand All @@ -123,15 +140,17 @@ pub trait Database: Send {
loop_location: Option<&[usize]>,
) -> WorkflowResult<()>;

/// Fetches a workflow that has the given json as a subset of its input after the given ts.
/// Fetches a workflow that has the given json as a subset of its input after the given ts. Used primarily
/// in tests.
async fn poll_workflow(
&self,
name: &str,
input: &serde_json::Value,
after_ts: i64,
) -> WorkflowResult<Option<(Uuid, i64)>>;

async fn publish_message_from_workflow(
/// Writes a message send event to history.
async fn commit_workflow_message_send_event(
&self,
from_workflow_id: Uuid,
location: &[usize],
Expand All @@ -141,6 +160,7 @@ pub trait Database: Send {
loop_location: Option<&[usize]>,
) -> WorkflowResult<()>;

/// Updates a loop event in history and forgets all history items in the previous iteration.
async fn update_loop(
&self,
workflow_id: Uuid,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use indoc::indoc;
use rivet_pools::prelude::NatsPool;
use sqlx::{pool::PoolConnection, Acquire, PgPool, Postgres};
use tracing::Instrument;
use uuid::Uuid;

use super::{
Expand All @@ -13,44 +15,22 @@ use crate::{
activity::ActivityId,
error::{WorkflowError, WorkflowResult},
event::combine_events,
message,
};

/// Max amount of workflows pulled from the database with each call to `pull_workflows`.
const MAX_PULLED_WORKFLOWS: i64 = 10;
/// Maximum times a query ran bu this database adapter is retried.
const MAX_QUERY_RETRIES: usize = 16;

pub struct DatabasePostgres {
pub struct DatabasePgNats {
pool: PgPool,
nats: NatsPool,
}

impl DatabasePostgres {
pub async fn new(url: &str) -> WorkflowResult<Arc<DatabasePostgres>> {
let pool = sqlx::postgres::PgPoolOptions::new()
// The default connection timeout is too high
.acquire_timeout(Duration::from_secs(15))
// Increase lifetime to mitigate: https://github.com/launchbadge/sqlx/issues/2854
//
// See max lifetime https://www.cockroachlabs.com/docs/stable/connection-pooling#set-the-maximum-lifetime-of-connections
.max_lifetime(Duration::from_secs(30 * 60))
// Remove connections after a while in order to reduce load
// on CRDB after bursts
.idle_timeout(Some(Duration::from_secs(3 * 60)))
// Open connections immediately on startup
.min_connections(1)
// Raise the cap, since this is effectively the amount of
// simultaneous requests we can handle. See
// https://www.cockroachlabs.com/docs/stable/connection-pooling.html
.max_connections(4096)
.connect(url)
.await
.map_err(WorkflowError::BuildSqlx)?;

Ok(Arc::new(DatabasePostgres { pool }))
}

pub fn from_pool(pool: PgPool) -> Arc<DatabasePostgres> {
Arc::new(DatabasePostgres { pool })
impl DatabasePgNats {
pub fn from_pools(pool: PgPool, nats: NatsPool) -> Arc<DatabasePgNats> {
Arc::new(DatabasePgNats { pool, nats })
}

async fn conn(&self) -> WorkflowResult<PoolConnection<Postgres>> {
Expand All @@ -63,6 +43,29 @@ impl DatabasePostgres {
}
}

/// Spawns a new thread and publishes a worker wake message to nats.
fn wake_worker(&self) {
let nats = self.nats.clone();

let spawn_res = tokio::task::Builder::new()
.name("chirp_workflow::DatabasePgNats::wake")
.spawn(
async move {
// Fail gracefully
if let Err(err) = nats
.publish(message::WORKER_WAKE_SUBJECT, Vec::new().into())
.await
{
tracing::warn!(?err, "failed to publish wake message");
}
}
.in_current_span(),
);
if let Err(err) = spawn_res {
tracing::error!(?err, "failed to spawn wake task");
}
}

/// Executes queries and explicitly handles retry errors.
async fn query<'a, F, Fut, T>(&self, mut cb: F) -> WorkflowResult<T>
where
Expand Down Expand Up @@ -94,7 +97,7 @@ impl DatabasePostgres {
}

#[async_trait::async_trait]
impl Database for DatabasePostgres {
impl Database for DatabasePgNats {
async fn dispatch_workflow(
&self,
ray_id: Uuid,
Expand Down Expand Up @@ -124,6 +127,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -378,6 +383,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -417,6 +424,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -648,6 +657,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -678,6 +689,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -726,6 +739,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -774,6 +789,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -824,6 +841,8 @@ impl Database for DatabasePostgres {
})
.await?;

self.wake_worker();

Ok(())
}

Expand Down Expand Up @@ -852,7 +871,7 @@ impl Database for DatabasePostgres {
.map_err(WorkflowError::Sqlx)
}

async fn publish_message_from_workflow(
async fn commit_workflow_message_send_event(
&self,
from_workflow_id: Uuid,
location: &[usize],
Expand Down
Loading

0 comments on commit 965f10e

Please sign in to comment.