Skip to content

Commit

Permalink
fix: fix signal history divergence (#1115)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Sep 4, 2024
1 parent 6659b34 commit 3cbfc1b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
7 changes: 3 additions & 4 deletions docs/libraries/workflow/DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ If you run in to a too many retries error on a workflow, then:
For a single workflow:

```sql
UPDATE db_workflow.workflows SET wake_immediate = true WHERE workflow_id = 'MY_ID':uuid;
UPDATE db_workflow.workflows SET wake_immediate = true WHERE workflow_id = 'MY_ID';
```

For all workflows of a type:
Expand All @@ -26,12 +26,11 @@ For all workflows of a type:
UPDATE db_workflow.workflows SET wake_immediate = true WHERE workflow_name = 'MY_NAME';
```


# Visualize entire workflow history

```sql
WITH workflow_events AS (
SELECT '1db61ba2-6271-40a5-9a38-e6fa212e6f7d'::uuid AS workflow_id
SELECT 'WORKFLOW_ID'::uuid AS workflow_id
)
SELECT location, 'activity' AS t, activity_name, input, output, forgotten
FROM db_workflow.workflow_activity_events, workflow_events
Expand Down Expand Up @@ -69,7 +68,7 @@ ORDER BY location ASC;

```sql
WITH workflow_ids AS (
SELECT 'WORKFLOW_ID':uuid AS workflow_id
SELECT 'WORKFLOW_ID'::uuid AS workflow_id
),
delete_activity_events AS (
DELETE FROM db_workflow.workflow_activity_events
Expand Down
2 changes: 1 addition & 1 deletion infra/tf/grafana/grafana_dashboards/chirp-workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@
"editorMode": "code",
"expr": "sum by (workflow_name, error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})",
"instant": false,
"legendFormat": "(workflow_name) {{error_code}}",
"legendFormat": "({{workflow_name}}) {{error_code}}",
"range": true,
"refId": "A"
}
Expand Down
12 changes: 6 additions & 6 deletions lib/bolt/cli/src/commands/cluster/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum SubCommand {
/// The name id of the cluster
#[clap(index = 1)]
cluster: String,
#[clap(long)]
#[clap(long, short = 's')]
server_id: Option<String>,
#[clap(long, short = 'p')]
pool: Option<String>,
Expand All @@ -24,7 +24,7 @@ pub enum SubCommand {
/// The name id of the cluster
#[clap(index = 1)]
cluster: String,
#[clap(long)]
#[clap(long, short = 's')]
server_id: Option<String>,
#[clap(long, short = 'p')]
pool: Option<String>,
Expand All @@ -38,7 +38,7 @@ pub enum SubCommand {
/// The name id of the cluster
#[clap(index = 1)]
cluster: String,
#[clap(long)]
#[clap(long, short = 's')]
server_id: Option<String>,
#[clap(long, short = 'p')]
pool: Option<String>,
Expand All @@ -52,7 +52,7 @@ pub enum SubCommand {
/// The name id of the cluster
#[clap(index = 1)]
cluster: String,
#[clap(long)]
#[clap(long, short = 's')]
server_id: Option<String>,
#[clap(long, short = 'p')]
pool: Option<String>,
Expand All @@ -66,7 +66,7 @@ pub enum SubCommand {
/// The name id of the cluster
#[clap(index = 1)]
cluster: String,
#[clap(long)]
#[clap(long, short = 's')]
server_id: Option<String>,
#[clap(long, short = 'p')]
pool: Option<String>,
Expand All @@ -80,7 +80,7 @@ pub enum SubCommand {
/// The name id of the cluster
#[clap(index = 1)]
cluster: String,
#[clap(long)]
#[clap(long, short = 's')]
server_id: Option<String>,
#[clap(long, short = 'p')]
pool: Option<String>,
Expand Down
14 changes: 8 additions & 6 deletions lib/chirp-workflow/core/src/builder/workflow/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
let event = self.ctx.current_history_event();

// Signal sent before
if let Some(event) = event {
let signal_id = if let Some(event) = event {
// Validate history is consistent
let Event::SignalSend(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
Expand All @@ -97,7 +97,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {

tracing::debug!(name=%self.ctx.name(), id=%self.ctx.workflow_id(), signal_name=%signal.name, signal_id=%signal.signal_id, "replaying signal dispatch");

Ok(signal.signal_id)
signal.signal_id
}
// Send signal
else {
Expand Down Expand Up @@ -149,10 +149,12 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
(None, true) => return Err(BuilderError::NoWorkflowIdOrTags.into()),
}

// Move to next event
self.ctx.inc_location();
signal_id
};

Ok(signal_id)
}
// Move to next event
self.ctx.inc_location();

Ok(signal_id)
}
}
1 change: 1 addition & 0 deletions svc/pkg/cluster/src/workflows/server/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ struct InstallOverSshInput {

#[activity(InstallOverSsh)]
#[timeout = 120]
#[max_retries = 10]
async fn install_over_ssh(ctx: &ActivityCtx, input: &InstallOverSshInput) -> GlobalResult<()> {
let public_ip = input.public_ip;
let private_key_openssh =
Expand Down

0 comments on commit 3cbfc1b

Please sign in to comment.