Skip to content

Commit

Permalink
feat(clusters): convert clusters to new workflow system
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jul 31, 2024
1 parent 80acec6 commit e4c16c9
Show file tree
Hide file tree
Showing 367 changed files with 6,839 additions and 7,197 deletions.
5 changes: 5 additions & 0 deletions docs/libraries/workflow/DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Design

## Hierarchy

TODO
37 changes: 35 additions & 2 deletions docs/libraries/workflow/GLOSSARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ A collection of registered workflows. This is solely used for the worker to fetc
A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or
sub workflow triggers.

Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should
be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow.
Workflows can be though of as an outline or a list of tasks. The code defining a workflow only specifies what
items should be ran; There is no complex logic (e.g. database queries) running within the top level of the
workflow.

Upon an activity failure, workflow code can be reran without duplicate side effects because activities are
cached and re-read after they succeed.
Expand All @@ -27,6 +28,11 @@ A block of code that can fail. This cannot trigger other workflows or activities
Activities are retried by workflows when they fail or replayed when they succeed but a later part of the
workflow fails.

When choosing between a workflow and an activity:

- Choose a workflow when there are multiple steps that need to be individually retried upon failure.
- Choose an activity when there is only one chunk of retryable code that needs to be executed.

## Operation

Effectively a native rust function. Can fail or not fail. Used for widely used operations like fetching a
Expand All @@ -51,6 +57,10 @@ this signal for it to be picked up, otherwise it will stay in the database indef
workflow. Signals do not have a response; another signal must be sent back from the workflow and listened to
by the sender.

### Differences between message

Signals are like messages that can only be consumed by workflows and can only be consumed once.

## Tagged Signal

Same as a signal except it is sent with a JSON blob as its "tags" instead of to a specific workflow. Any
Expand All @@ -65,6 +75,29 @@ See [the signals document](./SIGNALS.md).
A "one of" for signal listening. Allows for listening to multiple signals at once and receiving the first one
that gets sent.

## Message

A payload that can be sent out of a workflow. Includes a JSON blob for tags which can be subscribed to with a
subscription.

### Differences between signal

Messages are like signals that can be only consumed by non workflows and can be consumed by multiple
listeners.

## Subscription

An entity that waits for messages with the same (not a superset/subset) tags as itself. Upon receiving a
message, the message will be returned and the developer can choose to continue to listen for more messages.

## Tail

Reads the last message without waiting. If none exists (all previous messages expired), `None` is returned.

## Tail w/ Anchor

Reads the earliest message after the given anchor timestamp or waits for one to be published if none exist.

## Workflow Event

An action that gets executed in a workflow. An event can be a:
Expand Down
20 changes: 20 additions & 0 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Gotchas

## Timestamps

Use timestamps with care when passing them between activity inputs/outputs. Because activity inputs need to be
consistent for replays, use `util::timestamp::now()` only within activities and not workflow bodies.

If you need a timestamp in a workflow body, use `ctx.create_ts()` for the creation of the workflow. Using
`ctx.ts()` is also inconsistent because it marks the start of the current workflow run (which is different
between replays).

If you need a consistent current timestamp, create a new activity that just returns `util::timestamp::now()`.
This will be the current timestamp on the first execution of the activity and won't change on replay.

> **When an activity's input doesn't produce the same hash as the first time it was executed (i.e. its input
> changed), the entire workflow will error with "History Diverged" and will not restart.**
## Randomly generated content

Randomly generated content like UUIDs should be placed in activities for consistent history.
7 changes: 0 additions & 7 deletions docs/libraries/workflow/SIGNALS.md

This file was deleted.

29 changes: 29 additions & 0 deletions docs/libraries/workflow/SIGNALS_AND_MESSAGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Signals

## Tagged signals

Tagged signals are consumed on a first-come-first-serve basis because a single signal being consumed by more
than one workflow is not a supported design pattern. To work around this, consume the signal by a workflow
then publish multiple signals from that workflow.

# Choosing Between Signals and Messages

> **Note**: non-workflow ecosystem is API layer, standalone, operations, old workers
## Signal

- Sending data from the non-workflow ecosystem to the workflow ecosystem
- Sending data from the workflow ecosystem to somewhere else in the workflow ecosystem

## Message

- Sending data from the workflow ecosystem to the non-workflow ecosystem

## Both Signals and Messages

Sometimes you may need to listen for a particular event in the workflow system and the non-workflow ecosystem.
In this case you can publish both a signal and a message (you can derive `signal` and `message` on the same
struct to make this easier). Just remember: signals can only be consumed once.

Both messages and signals are meant to be payloads with a specific recipient. They are not meant to be
published without an intended target (i.e. any listener can consume).
2 changes: 1 addition & 1 deletion fern/definition/admin/clusters/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ types:
Server:
properties:
server_id: uuid
public_ip: string
public_ip: optional<string>

PoolUpdate:
properties:
Expand Down
3 changes: 2 additions & 1 deletion lib/bolt/cli/src/commands/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ impl SubCommand {
let server_ips = servers
.servers
.iter()
.map(|x| x.public_ip.as_str())
.filter_map(|x| x.public_ip.as_ref())
.map(|x| x.as_str())
.collect::<Vec<_>>();

// SSH in to servers
Expand Down
3 changes: 0 additions & 3 deletions lib/bolt/config/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,6 @@ pub struct NsfwCheck {
pub struct Provisioning {
/// Default cluster.
pub cluster: Option<ProvisioningCluster>,
/// Whether or not to send a taint message in the next cluster update.
#[serde(default)]
pub taint: bool,
/// How many empty job servers to have at all times. Used in the simple provisioning algorithm on Rivet
/// Enterprise.
#[serde(default = "default_job_server_provision_margin")]
Expand Down
8 changes: 0 additions & 8 deletions lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,14 +1060,6 @@ impl ServiceContextData {
"RIVET_DEFAULT_CLUSTER_CONFIG".into(),
serde_json::to_string(&provisioning.cluster)?,
);
env.insert(
"RIVET_TAINT_DEFAULT_CLUSTER".into(),
if provisioning.taint {
"1".to_string()
} else {
"0".to_string()
},
);
}

if self.depends_on_provision_margin() {
Expand Down
6 changes: 0 additions & 6 deletions lib/bolt/core/src/tasks/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@ async fn generate_root(path: &Path) {
}
}
}

// Utils lib
let util_path = pkg.path().join("util");
if fs::metadata(&util_path).await.is_ok() {
set_license(&util_path.join("Cargo.toml")).await;
}
}
}

Expand Down
75 changes: 45 additions & 30 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Forwards compatibility from old operation ctx to new workflows

use std::{fmt::Debug, time::Duration};
use std::fmt::Debug;

use global_error::prelude::*;
use serde::Serialize;
use uuid::Uuid;

use crate::{
ctx::api::WORKFLOW_TIMEOUT, DatabaseHandle, DatabasePostgres, Operation, OperationCtx,
OperationInput, Signal, Workflow, WorkflowError, WorkflowInput,
ctx::{
api::WORKFLOW_TIMEOUT,
message::{MessageCtx, SubscriptionHandle},
workflow::SUB_WORKFLOW_RETRY,
},
message::Message,
DatabaseHandle, DatabasePostgres, Operation, OperationCtx, OperationInput, Signal, Workflow,
WorkflowError, WorkflowInput,
};

pub async fn dispatch_workflow<I, B>(
Expand Down Expand Up @@ -83,30 +89,32 @@ where
}

/// Wait for a given workflow to complete.
/// **IMPORTANT:** Has no timeout.
/// 60 second timeout.
pub async fn wait_for_workflow<W: Workflow, B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");

let period = Duration::from_millis(50);
let mut interval = tokio::time::interval(period);
loop {
interval.tick().await;

// Check if state finished
let workflow = db_from_ctx(ctx)
.await?
.get_workflow(workflow_id)
.await
.map_err(GlobalError::raw)?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;
if let Some(output) = workflow.parse_output::<W>().map_err(GlobalError::raw)? {
return Ok(output);
tokio::time::timeout(WORKFLOW_TIMEOUT, async move {
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
loop {
interval.tick().await;

// Check if state finished
let workflow = db_from_ctx(ctx)
.await?
.get_workflow(workflow_id)
.await
.map_err(GlobalError::raw)?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;
if let Some(output) = workflow.parse_output::<W>().map_err(GlobalError::raw)? {
return Ok(output);
}
}
}
})
.await?
}

/// Dispatch a new workflow and wait for it to complete. Has a 60s timeout.
Expand All @@ -121,11 +129,7 @@ where
{
let workflow_id = dispatch_workflow(ctx, input).await?;

tokio::time::timeout(
WORKFLOW_TIMEOUT,
wait_for_workflow::<I::Workflow, _>(ctx, workflow_id),
)
.await?
wait_for_workflow::<I::Workflow, _>(ctx, workflow_id).await
}

/// Dispatch a new workflow and wait for it to complete. Has a 60s timeout.
Expand All @@ -141,11 +145,7 @@ where
{
let workflow_id = dispatch_tagged_workflow(ctx, tags, input).await?;

tokio::time::timeout(
WORKFLOW_TIMEOUT,
wait_for_workflow::<I::Workflow, _>(ctx, workflow_id),
)
.await?
wait_for_workflow::<I::Workflow, _>(ctx, workflow_id).await
}

pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
Expand Down Expand Up @@ -226,6 +226,21 @@ where
.map_err(GlobalError::raw)
}

pub async fn subscribe<M, B>(
ctx: &rivet_operation::OperationContext<B>,
tags: &serde_json::Value,
) -> GlobalResult<SubscriptionHandle<M>>
where
M: Message,
B: Debug + Clone,
{
let msg_ctx = MessageCtx::new(ctx.conn(), ctx.req_id(), ctx.ray_id())
.await
.map_err(GlobalError::raw)?;

msg_ctx.subscribe::<M>(tags).await.map_err(GlobalError::raw)
}

// Get crdb pool as a trait object
async fn db_from_ctx<B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
Expand Down
4 changes: 4 additions & 0 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ impl ActivityCtx {
self.name
}

pub fn workflow_id(&self) -> Uuid {
self.workflow_id
}

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}
Expand Down
Loading

0 comments on commit e4c16c9

Please sign in to comment.