Skip to content

Commit

Permalink
feat(pegboard): implement ws wf
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 10, 2024
1 parent 1eeedcb commit 36e552f
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 32 deletions.
9 changes: 9 additions & 0 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,12 @@ Be careful when writing your struct definitions.
When force waking a sleeping workflow by setting `wake_immediate = true`, know that if the workflow is
currently on a `sleep` step it will go back to sleep if it has not reached its `wake_deadline` yet. For all
other steps, the workflow will continue normally (usually just go back to sleep).

## Long-lived tasks in `ctx.join`

When executing multiple long-lived activities in a `ctx.join` call using a tuple, remember that internally it
uses `tokio::join!` and not `tokio::try_join`. This means it will wait until all items finish and does not
short circuit when an `Err` is returned from any branch.

So if you have an activity that errors immediately and another that takes a while to finish, the `ctx.join`
call will wait until the long task is complete (or errors) before returning.
56 changes: 29 additions & 27 deletions lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,33 +510,35 @@ impl ServiceContextData {
);
}

let can_depend =
if self.is_monolith_worker() {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
} else if matches!(self.config().kind, ServiceKind::Api { .. }) {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::ApiRoutes { .. }
| ServiceKind::Consumer { .. }
)
} else {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
};
let can_depend = if self.is_monolith_worker() {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. }
| ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
} else if matches!(self.config().kind, ServiceKind::Api { .. }) {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. }
| ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::ApiRoutes { .. }
| ServiceKind::Consumer { .. }
)
} else {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. }
| ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
};

if !can_depend {
panic!(
Expand Down
16 changes: 14 additions & 2 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,17 +541,29 @@ impl WorkflowCtx {
Ok(output)
}

/// Joins multiple executable actions (activities, closures) and awaits them simultaneously.
/// Joins multiple executable actions (activities, closures) and awaits them simultaneously. This does not
/// short circuit in the event of an error to make sure activity side effects are recorded.
pub async fn join<T: Executable>(&mut self, exec: T) -> GlobalResult<T::Output> {
exec.execute(self).await
}

/// Joins multiple executable actions (activities, closures) and awaits them simultaneously, short
/// circuiting in the event of an error.
///
/// **BEWARE**: You should almost **never** use `try_join` over `join`.
///
/// The only possible case for using this over `join` is:
/// - You have long running activities that are cancellable
pub async fn try_join<T: Executable>(&mut self, exec: T) -> GlobalResult<T::Output> {
exec.try_execute(self).await
}

/// Spawns a new thread to execute workflow steps in.
pub fn spawn<F, T: Send + 'static>(&mut self, f: F) -> tokio::task::JoinHandle<GlobalResult<T>>
where
F: for<'a> FnOnce(&'a mut WorkflowCtx) -> AsyncResult<'a, T> + Send + 'static,
{
let mut ctx = self.clone();
let mut ctx = self.branch();
tokio::task::spawn(async move { closure(f).execute(&mut ctx).await })
}

Expand Down
25 changes: 24 additions & 1 deletion lib/chirp-workflow/core/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ use crate::ctx::WorkflowCtx;
/// Signifies a retryable executable entity in a workflow. For example: activity, tuple of activities (join),
/// closure.
#[async_trait]
pub trait Executable: Send {
pub trait Executable: Send + Sized {
type Output: Send;

async fn execute(self, ctx: &mut WorkflowCtx) -> GlobalResult<Self::Output>;
/// In the event that an executable has multiple sub executables (i.e. a tuple of executables), this can
/// be implemented to provide the ability to choose whether to use `tokio::join!` or tokio::try_join!`
/// internally. Default implementation just calls `execute`.
async fn try_execute(self, ctx: &mut WorkflowCtx) -> GlobalResult<Self::Output> {
self.execute(ctx).await
}
}

pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = GlobalResult<T>> + Send + 'a>>;
Expand Down Expand Up @@ -59,6 +65,23 @@ macro_rules! impl_tuple {
// Handle errors here instead
Ok(($($args?),*))
}

async fn try_execute(self, ctx: &mut WorkflowCtx) -> GlobalResult<Self::Output> {
#[allow(non_snake_case)]
let ($($args),*) = self;

#[allow(non_snake_case)]
let ($(mut $args),*) = ($(
TupleHelper {
branch: ctx.step(),
exec: $args,
}
),*);

tokio::try_join!(
$($args.exec.execute(&mut $args.branch)),*
)
}
}
}
}
Expand Down
43 changes: 41 additions & 2 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions svc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ members = [
"pkg/monolith/standalone/workflow-worker",
"pkg/nomad/standalone/monitor",
"pkg/nsfw/ops/image-score",
"pkg/pegboard",
"pkg/perf/ops/log-get",
"pkg/profanity/ops/check",
"pkg/region/ops/get",
Expand Down
11 changes: 11 additions & 0 deletions svc/pkg/pegboard/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "pegboard"
version = "0.0.1"
edition = "2018"
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
license = "Apache-2.0"

[dependencies]
chirp-workflow = { path = "../../../lib/chirp-workflow/core" }
tokio-tungstenite = "0.23.1"
serde = { version = "1.0.198", features = ["derive"] }
7 changes: 7 additions & 0 deletions svc/pkg/pegboard/Service.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[service]
name = "pegboard"

[runtime]
kind = "rust"

[package]
14 changes: 14 additions & 0 deletions svc/pkg/pegboard/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use chirp_workflow::prelude::*;

pub mod ops;
pub mod utils;
pub mod workflows;

pub fn registry() -> WorkflowResult<Registry> {
use workflows::*;

let mut registry = Registry::new();
registry.register_workflow::<ws::Workflow>()?;

Ok(registry)
}
1 change: 1 addition & 0 deletions svc/pkg/pegboard/src/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions svc/pkg/pegboard/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions svc/pkg/pegboard/src/workflows/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod ws;
81 changes: 81 additions & 0 deletions svc/pkg/pegboard/src/workflows/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, RwLock},
};

use chirp_workflow::prelude::*;
use futures_util::FutureExt;
// use tokio::net::{TcpListener, TcpStream};
// use tokio_tungstenite::tungstenite::protocol::Message;

#[derive(Debug, Serialize, Deserialize)]
pub struct Input {}

#[workflow]
pub async fn pegboard_ws(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
// let addr = "127.0.0.1:8080";
// let listener = TcpListener::bind(&addr).await?;
// println!("Listening on: {}", addr);

let conns = Arc::new(RwLock::new(HashMap::<(), ()>::new()));

ctx.try_join((
closure(|ctx| socket_thread(ctx, conns.clone()).boxed()),
closure(|ctx| signal_thread(ctx, conns.clone()).boxed()),
))
.await?;

Ok(())
}

async fn socket_thread(
ctx: &mut WorkflowCtx,
conns: Arc<RwLock<HashMap<(), ()>>>,
) -> GlobalResult<()> {
ctx.repeat(|ctx| {
async move {
if let Ok((stream, addr)) = listener.accept().await {
handle_connection(stream, addr).await;
} else {
tracing::error!("failed to connect websocket");
}

Ok(Loop::Continue)
}.boxed()
)

Ok(())
}

async fn signal_thread(
ctx: &mut WorkflowCtx,
conns: Arc<RwLock<HashMap<(), ()>>>,
) -> GlobalResult<()> {
Ok(())
}

async fn handle_connection(ctx, raw_stream: TcpStream, addr: SocketAddr) {
ctx.spawn(|ctx| async move {
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
let (mut write, mut read) = ws_stream.split();

println!("New WebSocket connection: {}", addr);

while let Some(Ok(msg)) = read.next().await {
if msg.is_text() || msg.is_binary() {
write.send(msg).await?;
}
}

Ok(())
}.boxed()).await
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct FooInput {}

#[activity(Foo)]
async fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult<()> {
Ok(())
}

0 comments on commit 36e552f

Please sign in to comment.