Skip to content

Commit

Permalink
fix: add unique wf dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Dec 31, 2024
1 parent c5f7e9b commit 3415e28
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 253 deletions.
77 changes: 64 additions & 13 deletions packages/common/chirp-workflow/core/src/builder/common/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct WorkflowBuilder<I: WorkflowInput> {
ray_id: Uuid,
input: I,
tags: serde_json::Map<String, serde_json::Value>,
unique: bool,
error: Option<BuilderError>,
}

Expand All @@ -31,6 +32,7 @@ where
ray_id,
input,
tags: serde_json::Map::new(),
unique: false,
error: None,
}
}
Expand Down Expand Up @@ -65,6 +67,18 @@ where
self
}

/// Does not dispatch a workflow if one already exists with the given name and tags. Has no effect if no
/// tags are provided (will always spawn a new workflow).
pub fn unique(mut self) -> Self {
if self.error.is_some() {
return self;
}

self.unique = true;

self
}

pub async fn dispatch(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err.into());
Expand All @@ -77,29 +91,66 @@ where
let tags = serde_json::Value::Object(self.tags);
let tags = if no_tags { None } else { Some(&tags) };

tracing::debug!(
%workflow_name,
%workflow_id,
?tags,
input=?self.input,
"dispatching workflow"
);
if self.unique {
tracing::debug!(
%workflow_name,
?tags,
input=?self.input,
"dispatching unique workflow"
);
} else {
tracing::debug!(
%workflow_name,
%workflow_id,
?tags,
input=?self.input,
"dispatching workflow"
);
}

// Serialize input
let input_val = serde_json::value::to_raw_value(&self.input)
.map_err(WorkflowError::SerializeWorkflowOutput)
.map_err(GlobalError::raw)?;

self.db
.dispatch_workflow(self.ray_id, workflow_id, workflow_name, tags, &input_val)
let actual_workflow_id = self
.db
.dispatch_workflow(
self.ray_id,
workflow_id,
workflow_name,
tags,
&input_val,
self.unique,
)
.await
.map_err(GlobalError::raw)?;

metrics::WORKFLOW_DISPATCHED
.with_label_values(&[workflow_name])
.inc();
if self.unique {
if workflow_id == actual_workflow_id {
tracing::debug!(
%workflow_name,
%workflow_id,
?tags,
"dispatched unique workflow"
);
} else {
tracing::debug!(
%workflow_name,
workflow_id=%actual_workflow_id,
?tags,
"unique workflow already exists"
);
}
}

if workflow_id == actual_workflow_id {
metrics::WORKFLOW_DISPATCHED
.with_label_values(&[workflow_name])
.inc();
}

Ok(workflow_id)
Ok(actual_workflow_id)
}

pub async fn output(
Expand Down
169 changes: 107 additions & 62 deletions packages/common/chirp-workflow/core/src/builder/workflow/sub_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct SubWorkflowBuilder<'a, I: WorkflowInput> {

input: I,
tags: serde_json::Map<String, serde_json::Value>,
unique: bool,
error: Option<BuilderError>,
}

Expand All @@ -33,6 +34,7 @@ where

input,
tags: serde_json::Map::new(),
unique: false,
error: None,
}
}
Expand Down Expand Up @@ -67,6 +69,18 @@ where
self
}

/// Does not dispatch a workflow if one already exists with the given name and tags. Has no effect if no
/// tags are provided (will always spawn a new workflow).
pub fn unique(mut self) -> Self {
if self.error.is_some() {
return self;
}

self.unique = true;

self
}

pub async fn dispatch(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err.into());
Expand All @@ -84,57 +98,18 @@ where
.compare_version("sub workflow", self.version)
.map_err(GlobalError::raw)?;

Self::dispatch_workflow_inner(self.ctx, self.version, self.input, tags)
Self::dispatch_workflow_inner(self.ctx, self.version, self.input, tags, self.unique)
.await
.map_err(GlobalError::raw)
}

pub async fn output(
self,
) -> GlobalResult<<<I as WorkflowInput>::Workflow as Workflow>::Output> {
if let Some(err) = self.error {
return Err(err.into());
}

if !self.tags.is_empty() {
return Err(
BuilderError::TagsOnSubWorkflowOutputNotSupported(I::Workflow::NAME).into(),
);
}

// Err for version mismatch
self.ctx
.compare_version("sub workflow", self.version)
.map_err(GlobalError::raw)?;

let input_val = serde_json::value::to_raw_value(&self.input)
.map_err(WorkflowError::SerializeWorkflowInput)
.map_err(GlobalError::raw)?;
let mut branch = self
.ctx
.custom_branch(Arc::new(input_val), self.version)
.await
.map_err(GlobalError::raw)?;

tracing::debug!(name=%self.ctx.name(), id=%self.ctx.workflow_id(), sub_workflow_name=%I::Workflow::NAME, "running sub workflow");
// Run workflow
let output =
<<I as WorkflowInput>::Workflow as Workflow>::run(&mut branch, &self.input).await?;

// Validate no leftover events
branch.cursor().check_clear().map_err(GlobalError::raw)?;

// Move to next event
self.ctx.cursor_mut().update(branch.cursor().root());

Ok(output)
}

// This doesn't have a self parameter because self.tags was already moved (see above)
async fn dispatch_workflow_inner(
ctx: &mut WorkflowCtx,
version: usize,
input: I,
tags: Option<serde_json::Value>,
unique: bool,
) -> WorkflowResult<Uuid>
where
I: WorkflowInput,
Expand Down Expand Up @@ -162,21 +137,33 @@ where
let sub_workflow_name = I::Workflow::NAME;
let sub_workflow_id = Uuid::new_v4();

tracing::debug!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
%sub_workflow_id,
?tags,
?input,
"dispatching sub workflow"
);
if unique {
tracing::debug!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
?tags,
?input,
"dispatching unique sub workflow"
);
} else {
tracing::debug!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
%sub_workflow_id,
?tags,
?input,
"dispatching sub workflow"
);
}

// Serialize input
let input_val = serde_json::value::to_raw_value(&input)
.map_err(WorkflowError::SerializeWorkflowOutput)?;

ctx.db()
let actual_sub_workflow_id = ctx
.db()
.dispatch_sub_workflow(
ctx.ray_id(),
ctx.workflow_id(),
Expand All @@ -187,20 +174,37 @@ where
tags.as_ref(),
&input_val,
ctx.loop_location(),
unique,
)
.await?;

tracing::debug!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
?sub_workflow_id,
"sub workflow dispatched"
);
if unique {
if sub_workflow_id == actual_sub_workflow_id {
tracing::debug!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
%sub_workflow_id,
?tags,
"dispatched unique sub workflow"
);
} else {
tracing::debug!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
sub_workflow_id=%actual_sub_workflow_id,
?tags,
"unique sub workflow already exists"
);
}
}

metrics::WORKFLOW_DISPATCHED
.with_label_values(&[sub_workflow_name])
.inc();
if sub_workflow_id == actual_sub_workflow_id {
metrics::WORKFLOW_DISPATCHED
.with_label_values(&[sub_workflow_name])
.inc();
}

sub_workflow_id
};
Expand All @@ -210,4 +214,45 @@ where

Ok(id)
}

pub async fn output(
self,
) -> GlobalResult<<<I as WorkflowInput>::Workflow as Workflow>::Output> {
if let Some(err) = self.error {
return Err(err.into());
}

if !self.tags.is_empty() {
return Err(
BuilderError::TagsOnSubWorkflowOutputNotSupported(I::Workflow::NAME).into(),
);
}

// Err for version mismatch
self.ctx
.compare_version("sub workflow", self.version)
.map_err(GlobalError::raw)?;

let input_val = serde_json::value::to_raw_value(&self.input)
.map_err(WorkflowError::SerializeWorkflowInput)
.map_err(GlobalError::raw)?;
let mut branch = self
.ctx
.custom_branch(Arc::new(input_val), self.version)
.await
.map_err(GlobalError::raw)?;

tracing::debug!(name=%self.ctx.name(), id=%self.ctx.workflow_id(), sub_workflow_name=%I::Workflow::NAME, "running sub workflow");
// Run workflow
let output =
<<I as WorkflowInput>::Workflow as Workflow>::run(&mut branch, &self.input).await?;

// Validate no leftover events
branch.cursor().check_clear().map_err(GlobalError::raw)?;

// Move to next event
self.ctx.cursor_mut().update(branch.cursor().root());

Ok(output)
}
}
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
/// Poll interval when polling for signals in-process
const SIGNAL_RETRY: Duration = Duration::from_millis(100);
/// Most in-process signal poll tries
const MAX_SIGNAL_RETRIES: usize = 16;
const MAX_SIGNAL_RETRIES: usize = 4;
/// Most in-process sub workflow poll tries
const MAX_SUB_WORKFLOW_RETRIES: usize = 4;
/// Retry interval for failed db actions
Expand Down
Loading

0 comments on commit 3415e28

Please sign in to comment.