Skip to content

Commit

Permalink
fix: add pb manager debug metrics, handle unknown isolate runner grac…
Browse files Browse the repository at this point in the history
…efully
  • Loading branch information
MasterPtato committed Dec 31, 2024
1 parent 3415e28 commit 10be155
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 142 deletions.
6 changes: 3 additions & 3 deletions packages/common/chirp-workflow/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,21 @@ lazy_static::lazy_static! {
*REGISTRY,
).unwrap();

pub static ref SIGNAL_PUBLISHED: IntGaugeVec = register_int_gauge_vec_with_registry!(
pub static ref SIGNAL_PUBLISHED: IntCounterVec = register_int_counter_vec_with_registry!(
"chirp_workflow_signal_published",
"Total published signals.",
&["signal_name"],
*REGISTRY,
).unwrap();

pub static ref MESSAGE_PUBLISHED: IntGaugeVec = register_int_gauge_vec_with_registry!(
pub static ref MESSAGE_PUBLISHED: IntCounterVec = register_int_counter_vec_with_registry!(
"chirp_workflow_message_published",
"Total published messages.",
&["message_name"],
*REGISTRY,
).unwrap();

pub static ref WORKFLOW_DISPATCHED: IntGaugeVec = register_int_gauge_vec_with_registry!(
pub static ref WORKFLOW_DISPATCHED: IntCounterVec = register_int_counter_vec_with_registry!(
"chirp_workflow_workflow_dispatched",
"Total dispatched workflows.",
&["workflow_name"],
Expand Down
2 changes: 2 additions & 0 deletions packages/infra/client/config/src/runner_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ pub enum ToRunner {
signal: i32,
persist_state: bool,
},
// Kills the runner process
Terminate,
}
5 changes: 4 additions & 1 deletion packages/infra/client/isolate-v8-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ async fn main() -> Result<()> {
};

// Write exit code
if res.is_err() {
if let Err(err) = &res {
tracing::error!(?err);

fs::write(working_path.join("exit-code"), 1.to_string().as_bytes()).await?;
}

Expand Down Expand Up @@ -197,6 +199,7 @@ async fn handle_connection(
tracing::warn!("Actor {actor_id} not found for stopping");
}
}
runner_protocol::ToRunner::Terminate => bail!("Received terminate"),
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions packages/infra/client/manager/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Actor {
// Write actor to DB
let config_json = serde_json::to_vec(&self.config)?;

utils::query(|| async {
utils::sql::query(|| async {
// NOTE: On conflict here in case this query runs but the command is not acknowledged
sqlx::query(indoc!(
"
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Actor {
}

// Update DB
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query(indoc!(
"
UPDATE actors
Expand Down Expand Up @@ -356,7 +356,7 @@ impl Actor {

// Update stop_ts
if matches!(signal, Signal::SIGTERM | Signal::SIGKILL) || !has_runner {
let stop_ts_set = utils::query(|| async {
let stop_ts_set = utils::sql::query(|| async {
sqlx::query_as::<_, (bool,)>(indoc!(
"
UPDATE actors
Expand Down Expand Up @@ -398,7 +398,7 @@ impl Actor {
}

// Update DB
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query(indoc!(
"
UPDATE actors
Expand All @@ -417,7 +417,7 @@ impl Actor {
.await?;

// Unbind ports
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query(indoc!(
"
UPDATE actor_ports
Expand Down
2 changes: 1 addition & 1 deletion packages/infra/client/manager/src/actor/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ async fn bind_ports_inner(
let udp_offset = rand::thread_rng().gen_range(0..truncated_max);

// Selects available TCP and UDP ports
let rows = utils::query(|| async {
let rows = utils::sql::query(|| async {
sqlx::query_as::<_, (i64, i64)>(indoc!(
"
INSERT INTO actor_ports (actor_id, port, protocol)
Expand Down
57 changes: 40 additions & 17 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ use futures_util::{
use indoc::indoc;
use nix::unistd::Pid;
use pegboard::{protocol, system_info::SystemInfo};
use pegboard_config::{isolate_runner::Config as IsolateRunnerConfig, Client, Config};
use pegboard_config::{
isolate_runner::Config as IsolateRunnerConfig, runner_protocol, Client, Config,
};
use sqlx::{pool::PoolConnection, Sqlite, SqlitePool};
use tokio::{
fs,
net::{TcpListener, TcpStream},
sync::{Mutex, RwLock},
};
use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::{
tungstenite::protocol::{
frame::{coding::CloseCode, CloseFrame},
Message,
},
MaybeTlsStream, WebSocketStream,
};
use url::Url;
use uuid::Uuid;

Expand Down Expand Up @@ -120,7 +128,7 @@ impl Ctx {
let event_json = serde_json::to_vec(event)?;

// Fetch next idx
let index = utils::query(|| async {
let index = utils::sql::query(|| async {
let mut conn = self.sql().await?;
let mut txn = conn.begin_immediate().await?;

Expand Down Expand Up @@ -184,15 +192,30 @@ impl Ctx {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let mut socket = tokio_tungstenite::accept_async(stream).await?;
let mut ws_stream = tokio_tungstenite::accept_async(stream).await?;

tracing::info!("received new socket");

if let Some(runner) = &*self2.isolate_runner.read().await {
runner.attach_socket(socket).await?;
runner.attach_socket(ws_stream).await?;
} else {
socket.close(None).await?;
bail!("no isolate runner to attach socket to");
// HACK(RVT-4456): Until we find out where unknown runners (runners that the
// manager does not know about) come from, we just kill them
tracing::error!("killing unknown runner");

metrics::UNKNOWN_ISOLATE_RUNNER.with_label_values(&[]).inc();

ws_stream
.send(Message::Binary(serde_json::to_vec(
&runner_protocol::ToRunner::Terminate,
)?))
.await?;

let close_frame = CloseFrame {
code: CloseCode::Error,
reason: "unknown runner".into(),
};
ws_stream.send(Message::Close(Some(close_frame))).await?;
}
}
Err(err) => tracing::error!(?err, "failed to connect websocket"),
Expand All @@ -216,7 +239,7 @@ impl Ctx {

// Send init packet
{
let (last_command_idx,) = utils::query(|| async {
let (last_command_idx,) = utils::sql::query(|| async {
sqlx::query_as::<_, (i64,)>(indoc!(
"
SELECT last_command_idx FROM state
Expand Down Expand Up @@ -323,7 +346,7 @@ impl Ctx {

// Ack command
tokio::try_join!(
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query(indoc!(
"
UPDATE state
Expand All @@ -334,7 +357,7 @@ impl Ctx {
.execute(&mut *self.sql().await?)
.await
}),
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query(indoc!(
"
INSERT INTO commands (
Expand All @@ -360,7 +383,7 @@ impl Ctx {
/// Sends all events after the given idx.
async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> {
// Fetch all missed events
let events = utils::query(|| async {
let events = utils::sql::query(|| async {
sqlx::query_as::<_, (i64, Vec<u8>)>(indoc!(
"
SELECT idx, payload
Expand Down Expand Up @@ -394,7 +417,7 @@ impl Ctx {
let ((last_event_idx,), actor_rows) = tokio::try_join!(
// There should not be any database operations going on at this point so it is safe to read this
// value
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query_as::<_, (i64,)>(indoc!(
"
SELECT last_event_idx FROM state
Expand All @@ -403,7 +426,7 @@ impl Ctx {
.fetch_one(&mut *self.sql().await?)
.await
}),
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query_as::<_, ActorRow>(indoc!(
"
SELECT actor_id, config, pid, stop_ts
Expand All @@ -426,7 +449,7 @@ impl Ctx {
if row.pid.is_none() && row.stop_ts.is_none() {
tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping");

utils::query(|| async {
utils::sql::query(|| async {
sqlx::query(indoc!(
"
UPDATE actors
Expand Down Expand Up @@ -538,7 +561,7 @@ impl Ctx {
self.observe_isolate_runner(&runner);

// Save runner pid
utils::query(|| async {
utils::sql::query(|| async {
sqlx::query(indoc!(
"
UPDATE state
Expand Down Expand Up @@ -580,7 +603,7 @@ impl Ctx {
*guard = None;

// Update db state
let res = utils::query(|| async {
let res = utils::sql::query(|| async {
sqlx::query(indoc!(
"
UPDATE state
Expand All @@ -601,7 +624,7 @@ impl Ctx {

/// Fetches isolate runner state from the db. Should be called before the manager's runner websocket opens.
async fn rebuild_isolate_runner(self: &Arc<Self>) -> Result<()> {
let (isolate_runner_pid,) = utils::query(|| async {
let (isolate_runner_pid,) = utils::sql::query(|| async {
sqlx::query_as::<_, (Option<i32>,)>(indoc!(
"
SELECT isolate_runner_pid
Expand Down
25 changes: 23 additions & 2 deletions packages/infra/client/manager/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,37 @@ pub use server::run_standalone;

lazy_static::lazy_static! {
pub static ref PACKET_RECV_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"pegboard_packet_recv_total",
"packet_recv_total",
"Total number of packets received.",
&[],
*REGISTRY,
).unwrap();

pub static ref PACKET_SEND_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"pegboard_packet_send_total",
"packet_send_total",
"Total number of packets sent.",
&[],
*REGISTRY,
).unwrap();

pub static ref UNKNOWN_ISOLATE_RUNNER: IntCounterVec = register_int_counter_vec_with_registry!(
"unknown_isolate_runner",
"Total number of unknown isolate runners that were found and killed.",
&[],
*REGISTRY,
).unwrap();

pub static ref DUPLICATE_ISOLATE_RUNNER: IntCounterVec = register_int_counter_vec_with_registry!(
"duplicate_isolate_runner",
"Total number of duplicate isolate runners that were found and killed.",
&[],
*REGISTRY,
).unwrap();

pub static ref SQL_ERROR: IntCounterVec = register_int_counter_vec_with_registry!(
"sql_error",
"An SQL error occurred.",
&["error"],
*REGISTRY,
).unwrap();
}
Loading

0 comments on commit 10be155

Please sign in to comment.