Skip to content

Commit

Permalink
feat(workflows): allow changing tags in workflow (#962)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Aug 2, 2024
1 parent 774da5c commit 01ecf86
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 10 deletions.
5 changes: 0 additions & 5 deletions lib/bolt/core/src/tasks/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ async fn generate_root(path: &Path) {
let _ = fs::remove_file(pkg.path().join("Cargo.lock")).await;

set_license(&pkg.path().join("Cargo.toml")).await;

let types_path = pkg.path().join("types");
if fs::metadata(&types_path).await.is_ok() {
set_license(&types_path.join("Cargo.toml")).await;
}
} else {
// Check worker
let worker_path = pkg.path().join("worker");
Expand Down
12 changes: 12 additions & 0 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, Workfl

#[derive(Clone)]
pub struct ActivityCtx {
workflow_id: Uuid,
ray_id: Uuid,
name: &'static str,
ts: i64,
Expand All @@ -20,6 +21,7 @@ pub struct ActivityCtx {

impl ActivityCtx {
pub fn new(
workflow_id: Uuid,
db: DatabaseHandle,
conn: &rivet_connection::Connection,
activity_create_ts: i64,
Expand All @@ -42,6 +44,7 @@ impl ActivityCtx {
op_ctx.from_workflow = true;

ActivityCtx {
workflow_id,
ray_id,
name,
ts,
Expand Down Expand Up @@ -76,6 +79,15 @@ impl ActivityCtx {
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
}

pub async fn update_workflow_tags(&self, tags: &serde_json::Value) -> GlobalResult<()> {
self.db
.update_workflow_tags(
self.workflow_id,
tags,
)
.await.map_err(GlobalError::raw)
}
}

impl ActivityCtx {
Expand Down
1 change: 1 addition & 0 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl WorkflowCtx {
create_ts: i64,
) -> WorkflowResult<A::Output> {
let ctx = ActivityCtx::new(
self.workflow_id,
self.db.clone(),
&self.conn,
self.create_ts,
Expand Down
5 changes: 5 additions & 0 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub trait Database: Send {
wake_sub_workflow: Option<Uuid>,
error: &str,
) -> WorkflowResult<()>;
async fn update_workflow_tags(
&self,
workflow_id: Uuid,
tags: &serde_json::Value,
) -> WorkflowResult<()>;

async fn commit_workflow_activity_event(
&self,
Expand Down
23 changes: 23 additions & 0 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,29 @@ impl Database for DatabasePostgres {
Ok(())
}

// TODO: Theres nothing preventing this from being able to be called from the workflow ctx also, but for
// now its only in the activity ctx so it isn't called again during workflow retries
async fn update_workflow_tags(
&self,
workflow_id: Uuid,
tags: &serde_json::Value,
) -> WorkflowResult<()> {
sqlx::query(indoc!(
"
UPDATE db_workflow.workflows
SET tags = $2
WHERE workflow_id = $1
",
))
.bind(workflow_id)
.bind(tags)
.execute(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;

Ok(())
}

async fn commit_workflow_activity_event(
&self,
workflow_id: Uuid,
Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where
// Implements `Executable` for any tuple size
macro_rules! impl_tuple {
($($args:ident),*) => {
#[::async_trait::async_trait]
#[async_trait::async_trait]
impl<$($args : Executable),*> Executable for ($($args),*) {
type Output = ($(<$args as Executable>::Output),*);

Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod util {

pub use crate::{
activity::Activity,
signal::{Listen, Signal},
signal::{Listen, Signal, join_signal},
ctx::*,
db,
error::{WorkflowError, WorkflowResult},
Expand Down
15 changes: 13 additions & 2 deletions lib/chirp-workflow/core/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,22 @@ pub trait Listen: Sized {
/// ````
#[macro_export]
macro_rules! join_signal {
(pub $join:ident, [$($signals:ident),*]) => {
pub enum $join {
$($signals($signals)),*
}

join_signal!(@ $join, [$($signals),*]);
};
($join:ident, [$($signals:ident),*]) => {
enum $join {
$($signals($signals)),*
}

#[::async_trait::async_trait]
join_signal!(@ $join, [$($signals),*]);
};
(@ $join:ident, [$($signals:ident),*]) => {
#[async_trait::async_trait]
impl Listen for $join {
async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[$($signals::NAME),*]).await?;
Expand All @@ -73,5 +83,6 @@ macro_rules! join_signal {
}
}
}
}
};
}
pub use join_signal;
2 changes: 1 addition & 1 deletion svc/pkg/mm/util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ uuid = { version = "1", features = ["v4", "serde"] }
ip-info = { path = "../../ip/ops/info" }
mm-lobby-list-for-user-id = { path = "../ops/lobby-list-for-user-id" }
region-get = { path = "../../region/ops/get" }
user-identity-get = { path = "../../user-identity/ops/get" }
user-identity-get = { path = "../../user-identity/ops/get" }

0 comments on commit 01ecf86

Please sign in to comment.