Skip to content

Commit

Permalink
feat: run sub workflows in the same process (#789)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 5, 2024
1 parent 787971b commit 717e096
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 828 deletions.
16 changes: 14 additions & 2 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult};
use rivet_pools::prelude::*;
use uuid::Uuid;

use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError};
use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError};

pub struct ActivityCtx {
ray_id: Uuid,
Expand All @@ -26,7 +26,19 @@ impl ActivityCtx {
name: &'static str,
) -> Self {
let ts = rivet_util::timestamp::now();
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, true, name, ts);
let req_id = Uuid::new_v4();
let conn = conn.wrap(req_id, ray_id, name);
let mut op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
workflow_create_ts,
(),
);
op_ctx.from_workflow = true;

ActivityCtx {
ray_id,
Expand Down
16 changes: 14 additions & 2 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult};
use rivet_pools::prelude::*;
use uuid::Uuid;

use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError};
use crate::{DatabaseHandle, Operation, OperationInput, WorkflowError};

pub struct OperationCtx {
ray_id: Uuid,
Expand All @@ -27,7 +27,19 @@ impl OperationCtx {
name: &'static str,
) -> Self {
let ts = rivet_util::timestamp::now();
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, from_workflow, name, ts);
let req_id = Uuid::new_v4();
let conn = conn.wrap(req_id, ray_id, name);
let mut op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
req_ts,
(),
);
op_ctx.from_workflow = from_workflow;

OperationCtx {
ray_id,
Expand Down
68 changes: 61 additions & 7 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ impl WorkflowCtx {
}
}

// TODO(RVTEE-103): Run sub workflow inline as a branch of the parent workflow
/// Trigger another workflow and wait for its response.
/// Runs a sub workflow in the same process as the current workflow and returns its response.
pub async fn workflow<I>(
&mut self,
input: I,
Expand All @@ -430,11 +429,66 @@ impl WorkflowCtx {
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
let sub_workflow_id = self.dispatch_workflow(input).await?;
let output = self
.wait_for_workflow::<I::Workflow>(sub_workflow_id)
.await?;
Ok(output)
// Lookup workflow
let Ok(workflow) = self.registry.get_workflow(I::Workflow::name()) else {
tracing::warn!(
id=%self.workflow_id,
name=%I::Workflow::name(),
"sub workflow not found in current registry",
);

// TODO(RVT-3755): If a sub workflow is dispatched, then the worker is updated to include the sub
// worker in the registry, this will diverge in history because it will try to run the sub worker
// in process during the replay
// If the workflow isn't in the current registry, dispatch the workflow instead
let sub_workflow_id = self.dispatch_workflow(input).await?;
let output = self
.wait_for_workflow::<I::Workflow>(sub_workflow_id)
.await?;

return Ok(output);
};

tracing::info!(id=%self.workflow_id, name=%I::Workflow::name(), "running sub workflow");

// Create a new branched workflow context for the sub workflow
let mut ctx = WorkflowCtx {
workflow_id: self.workflow_id,
name: I::Workflow::name().to_string(),
create_ts: rivet_util::timestamp::now(),
ray_id: self.ray_id,

registry: self.registry.clone(),
db: self.db.clone(),

conn: self
.conn
.wrap(Uuid::new_v4(), self.ray_id, I::Workflow::name()),

event_history: self.event_history.clone(),

// TODO(RVT-3756): This is redundant with the deserialization in `workflow.run` in the registry
input: Arc::new(serde_json::to_value(input)?),

root_location: self
.root_location
.iter()
.cloned()
.chain(std::iter::once(self.location_idx))
.collect(),
location_idx: 0,
};

self.location_idx += 1;

// Run workflow
let output = (workflow.run)(&mut ctx).await?;

// TODO: RVT-3756
// Deserialize output
serde_json::from_value(output)
.map_err(WorkflowError::DeserializeWorkflowOutput)
.map_err(GlobalError::raw)
}

/// Run activity. Will replay on failure.
Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl Database for DatabasePostgres {
INSERT INTO db_workflow.workflows (
workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate
)
VALUES ($5, $2, $3, $4, $5, true)
VALUES ($7, $2, $3, $4, $5, true)
RETURNING 1
),
sub_workflow AS (
Expand Down
37 changes: 0 additions & 37 deletions lib/chirp-workflow/core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,40 +136,3 @@ pub(crate) fn new_conn(

rivet_connection::Connection::new(client, pools.clone(), cache.clone())
}

pub fn wrap_conn(
conn: &rivet_connection::Connection,
ray_id: Uuid,
req_ts: i64,
from_workflow: bool,
name: &str,
ts: i64,
) -> (
rivet_connection::Connection,
rivet_operation::OperationContext<()>,
) {
let req_id = Uuid::new_v4();
let trace_entry = chirp_client::TraceEntry {
context_name: name.to_string(),
req_id: Some(req_id.into()),
ts,
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
};
let conn = conn.wrap(req_id, ray_id, trace_entry);
let mut op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
req_ts,
(),
);
op_ctx.from_workflow = from_workflow;

(conn, op_ctx)
}
3 changes: 2 additions & 1 deletion lib/chirp-workflow/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl Worker {

pub async fn start(mut self, pools: rivet_pools::Pools) -> GlobalResult<()> {
let mut interval = tokio::time::interval(TICK_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let shared_client = chirp_client::SharedClient::from_env(pools.clone())?;
let cache = rivet_cache::CacheInner::from_env(pools.clone())?;
Expand All @@ -31,7 +32,7 @@ impl Worker {
}
}

// Query the database for new workflows and run them.
/// Query the database for new workflows and run them.
async fn tick(
&mut self,
shared_client: &chirp_client::SharedClientHandle,
Expand Down
67 changes: 0 additions & 67 deletions lib/chirp-workflow/core/tests/basic.rs

This file was deleted.

33 changes: 0 additions & 33 deletions lib/chirp-workflow/core/tests/common.rs

This file was deleted.

Loading

0 comments on commit 717e096

Please sign in to comment.