From 01ecf860783ad06fef5411722fb8ea217b23b620 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Fri, 2 Aug 2024 03:29:36 +0000 Subject: [PATCH] feat(workflows): allow changing tags in workflow (#962) ## Changes --- lib/bolt/core/src/tasks/gen.rs | 5 ----- lib/chirp-workflow/core/src/ctx/activity.rs | 12 +++++++++++ lib/chirp-workflow/core/src/ctx/workflow.rs | 1 + lib/chirp-workflow/core/src/db/mod.rs | 5 +++++ lib/chirp-workflow/core/src/db/postgres.rs | 23 +++++++++++++++++++++ lib/chirp-workflow/core/src/executable.rs | 2 +- lib/chirp-workflow/core/src/prelude.rs | 2 +- lib/chirp-workflow/core/src/signal.rs | 15 ++++++++++++-- svc/pkg/mm/util/Cargo.toml | 2 +- 9 files changed, 57 insertions(+), 10 deletions(-) diff --git a/lib/bolt/core/src/tasks/gen.rs b/lib/bolt/core/src/tasks/gen.rs index 0d6d276e4e..9511dba8df 100644 --- a/lib/bolt/core/src/tasks/gen.rs +++ b/lib/bolt/core/src/tasks/gen.rs @@ -125,11 +125,6 @@ async fn generate_root(path: &Path) { let _ = fs::remove_file(pkg.path().join("Cargo.lock")).await; set_license(&pkg.path().join("Cargo.toml")).await; - - let types_path = pkg.path().join("types"); - if fs::metadata(&types_path).await.is_ok() { - set_license(&types_path.join("Cargo.toml")).await; - } } else { // Check worker let worker_path = pkg.path().join("worker"); diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index ae7ad6696a..b2684c870c 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -6,6 +6,7 @@ use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, Workfl #[derive(Clone)] pub struct ActivityCtx { + workflow_id: Uuid, ray_id: Uuid, name: &'static str, ts: i64, @@ -20,6 +21,7 @@ pub struct ActivityCtx { impl ActivityCtx { pub fn new( + workflow_id: Uuid, db: DatabaseHandle, conn: &rivet_connection::Connection, activity_create_ts: i64, @@ -42,6 +44,7 @@ impl ActivityCtx { op_ctx.from_workflow = true; ActivityCtx { + workflow_id, ray_id, name, ts, @@ -76,6 +79,15 @@ impl ActivityCtx { .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw) } + + pub async fn update_workflow_tags(&self, tags: &serde_json::Value) -> GlobalResult<()> { + self.db + .update_workflow_tags( + self.workflow_id, + tags, + ) + .await.map_err(GlobalError::raw) + } } impl ActivityCtx { diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 78d9006279..0523724b03 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -251,6 +251,7 @@ impl WorkflowCtx { create_ts: i64, ) -> WorkflowResult { let ctx = ActivityCtx::new( + self.workflow_id, self.db.clone(), &self.conn, self.create_ts, diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 0e27439f8a..dbf1c16f5e 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -42,6 +42,11 @@ pub trait Database: Send { wake_sub_workflow: Option, error: &str, ) -> WorkflowResult<()>; + async fn update_workflow_tags( + &self, + workflow_id: Uuid, + tags: &serde_json::Value, + ) -> WorkflowResult<()>; async fn commit_workflow_activity_event( &self, diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 53641c3801..f15d761f6d 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -341,6 +341,29 @@ impl Database for DatabasePostgres { Ok(()) } + // TODO: Theres nothing preventing this from being able to be called from the workflow ctx also, but for + // now its only in the activity ctx so it isn't called again during workflow retries + async fn update_workflow_tags( + &self, + workflow_id: Uuid, + tags: &serde_json::Value, + ) -> WorkflowResult<()> { + sqlx::query(indoc!( + " + UPDATE db_workflow.workflows + SET tags = $2 + WHERE workflow_id = $1 + ", + )) + .bind(workflow_id) + .bind(tags) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx)?; + + Ok(()) + } + async fn commit_workflow_activity_event( &self, workflow_id: Uuid, diff --git a/lib/chirp-workflow/core/src/executable.rs b/lib/chirp-workflow/core/src/executable.rs index 2790d5a1ee..e56e68785b 100644 --- a/lib/chirp-workflow/core/src/executable.rs +++ b/lib/chirp-workflow/core/src/executable.rs @@ -34,7 +34,7 @@ where // Implements `Executable` for any tuple size macro_rules! impl_tuple { ($($args:ident),*) => { - #[::async_trait::async_trait] + #[async_trait::async_trait] impl<$($args : Executable),*> Executable for ($($args),*) { type Output = ($(<$args as Executable>::Output),*); diff --git a/lib/chirp-workflow/core/src/prelude.rs b/lib/chirp-workflow/core/src/prelude.rs index 4703fa6311..a0d666bf2b 100644 --- a/lib/chirp-workflow/core/src/prelude.rs +++ b/lib/chirp-workflow/core/src/prelude.rs @@ -15,7 +15,7 @@ pub mod util { pub use crate::{ activity::Activity, - signal::{Listen, Signal}, + signal::{Listen, Signal, join_signal}, ctx::*, db, error::{WorkflowError, WorkflowResult}, diff --git a/lib/chirp-workflow/core/src/signal.rs b/lib/chirp-workflow/core/src/signal.rs index 9db2376667..d95dec1069 100644 --- a/lib/chirp-workflow/core/src/signal.rs +++ b/lib/chirp-workflow/core/src/signal.rs @@ -44,12 +44,22 @@ pub trait Listen: Sized { /// ```` #[macro_export] macro_rules! join_signal { + (pub $join:ident, [$($signals:ident),*]) => { + pub enum $join { + $($signals($signals)),* + } + + join_signal!(@ $join, [$($signals),*]); + }; ($join:ident, [$($signals:ident),*]) => { enum $join { $($signals($signals)),* } - #[::async_trait::async_trait] + join_signal!(@ $join, [$($signals),*]); + }; + (@ $join:ident, [$($signals:ident),*]) => { + #[async_trait::async_trait] impl Listen for $join { async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult { let row = ctx.listen_any(&[$($signals::NAME),*]).await?; @@ -73,5 +83,6 @@ macro_rules! join_signal { } } } - } + }; } +pub use join_signal; diff --git a/svc/pkg/mm/util/Cargo.toml b/svc/pkg/mm/util/Cargo.toml index 8c0e3ae204..ea1177459b 100644 --- a/svc/pkg/mm/util/Cargo.toml +++ b/svc/pkg/mm/util/Cargo.toml @@ -20,4 +20,4 @@ uuid = { version = "1", features = ["v4", "serde"] } ip-info = { path = "../../ip/ops/info" } mm-lobby-list-for-user-id = { path = "../ops/lobby-list-for-user-id" } region-get = { path = "../../region/ops/get" } -user-identity-get = { path = "../../user-identity/ops/get" } \ No newline at end of file +user-identity-get = { path = "../../user-identity/ops/get" }