Skip to content

Commit

Permalink
fix: add lost state for pb actors (#1378)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Nov 21, 2024
1 parent cdda797 commit 8baec56
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl Ctx {

self.event(protocol::Event::ActorStateUpdate {
actor_id: row.actor_id,
state: protocol::ActorState::Stopped,
state: protocol::ActorState::Lost,
})
.await?;
}
Expand Down
13 changes: 11 additions & 2 deletions packages/services/ds/src/workflows/server/pegboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,20 @@ pub(crate) async fn ds_server_pegboard(ctx: &mut WorkflowCtx, input: &Input) ->
}
}
pp::ActorState::Stopping | pp::ActorState::Stopped => {}
pp::ActorState::Exited { exit_code } => {
pp::ActorState::Exited { .. } | pp::ActorState::Lost => {
let exit_code = if let pp::ActorState::Exited { exit_code } = sig.state
{
exit_code
} else {
None
};

tracing::debug!(?exit_code, "actor stopped");

let failed = exit_code.map(|exit_code| exit_code != 0).unwrap_or(true);

// Reschedule durable actor if it errored
if input.lifecycle.durable && exit_code.unwrap_or(1) != 0 {
if input.lifecycle.durable && failed {
if let Some(sig) = reschedule_actor(ctx, &input).await? {
// Destroyed early
return Ok(Loop::Break(StateRes {
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE actors
ADD COLUMN lost_ts INT;
14 changes: 10 additions & 4 deletions packages/services/pegboard/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,19 @@ pub enum ActorState {
/// Actor planned to stop.
/// Sent by pegboard dc.
Stopping,
/// Actor stopped, process exit not yet received.
/// Sent by pegboard client and pegboard gc.
/// Actor stopped on client, process not yet exited.
/// Sent by pegboard client.
Stopped,
/// Actor was lost in some way and will never be marked as stopped (if not already) and will never exit.
/// Sent by pegboard client and pegboard gc.
Lost,
/// Actor process exited.
/// Sent by pegboard client.
Exited { exit_code: Option<i32> },
/// Actor failed to allocate to a client.
Exited {
/// Unset if the exit code could not be read (usually from SIGKILL or lost process)
exit_code: Option<i32>,
},
/// Datacenter failed to allocate the actor to a client.
/// Sent by pegboard dc.
FailedToAllocate,
}
Expand Down
15 changes: 15 additions & 0 deletions packages/services/pegboard/src/workflows/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,20 @@ async fn update_actor_state(
)
.await?;
}
Lost => {
sql_execute!(
[ctx]
"
UPDATE db_pegboard.actors
SET
lost_ts = $2
WHERE actor_id = $1
",
actor_id,
util::timestamp::now(),
)
.await?;
}
Exited { exit_code } => {
sql_execute!(
[ctx]
Expand All @@ -356,6 +370,7 @@ async fn update_actor_state(
)
.await?;
}
// These updates should never reach this workflow
Allocated { .. } | FailedToAllocate => bail!("invalid state for updating db"),
},
}
Expand Down
41 changes: 30 additions & 11 deletions packages/services/pegboard/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30);
/// How long to wait after stopping and not receiving a stop state before manually setting actor as
/// stopped.
const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30);
/// How long to wait after stopped and not receiving an exit state before manually setting actor as
/// exited.
const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5);

#[derive(sqlx::FromRow)]
struct ActorRow {
Expand Down Expand Up @@ -67,18 +70,32 @@ pub async fn run_from_env(
actor_id,
client_id,
running_ts IS NULL AS failed_start,
stopping_ts IS NOT NULL AS failed_stop
stopping_ts IS NOT NULL AS failed_stop,
stop_ts IS NOT NULL AS failed_exit
FROM db_pegboard.actors
WHERE
exit_ts IS NULL AND
lost_ts IS NULL AND
(
(create_ts < $1 AND running_ts IS NULL) OR
stopping_ts < $2
) AND
stop_ts IS NULL AND
exit_ts IS NULL
-- create_ts exceeded threshold and running_ts is null (failed start)
(
create_ts < $1 AND
running_ts IS NULL AND
stopping_ts IS NULL AND
stop_ts IS NULL
) OR
-- stopping_ts exceeded threshold and stop_ts is null (failed stop)
(
stopping_ts < $2 AND
stop_ts IS NULL
) OR
-- stop_ts exceeded threshold and exit_ts is null (failed exit)
stop_ts < $3
)
",
ts - ACTOR_START_THRESHOLD_MS,
ts - ACTOR_STOP_THRESHOLD_MS,
ts - ACTOR_EXIT_THRESHOLD_MS
),
)?;

Expand All @@ -92,24 +109,26 @@ pub async fn run_from_env(
}

for row in &failed_actor_rows {
if row.failed_stop {
tracing::warn!(actor_id=?row.actor_id, "actor failed to stop");
if row.failed_exit {
tracing::error!(actor_id=?row.actor_id, "actor failed to exit");

ctx.signal(pegboard::workflows::client::ActorStateUpdate {
state: protocol::ActorState::Stopped,
state: protocol::ActorState::Lost,
})
.tag("actor_id", row.actor_id)
.send()
.await?;
} else if row.failed_stop {
tracing::error!(actor_id=?row.actor_id, "actor failed to stop");

ctx.signal(pegboard::workflows::client::ActorStateUpdate {
state: protocol::ActorState::Exited { exit_code: None },
state: protocol::ActorState::Lost,
})
.tag("actor_id", row.actor_id)
.send()
.await?;
} else if row.failed_start {
tracing::warn!(actor_id=?row.actor_id, "actor failed to start");
tracing::error!(actor_id=?row.actor_id, "actor failed to start");

ctx.signal(protocol::Command::SignalActor {
actor_id: row.actor_id,
Expand Down

0 comments on commit 8baec56

Please sign in to comment.