diff --git a/packages/common/chirp-workflow/core/src/metrics.rs b/packages/common/chirp-workflow/core/src/metrics.rs index bd8dd24faa..cf16c20184 100644 --- a/packages/common/chirp-workflow/core/src/metrics.rs +++ b/packages/common/chirp-workflow/core/src/metrics.rs @@ -93,21 +93,21 @@ lazy_static::lazy_static! { *REGISTRY, ).unwrap(); - pub static ref SIGNAL_PUBLISHED: IntGaugeVec = register_int_gauge_vec_with_registry!( + pub static ref SIGNAL_PUBLISHED: IntCounterVec = register_int_counter_vec_with_registry!( "chirp_workflow_signal_published", "Total published signals.", &["signal_name"], *REGISTRY, ).unwrap(); - pub static ref MESSAGE_PUBLISHED: IntGaugeVec = register_int_gauge_vec_with_registry!( + pub static ref MESSAGE_PUBLISHED: IntCounterVec = register_int_counter_vec_with_registry!( "chirp_workflow_message_published", "Total published messages.", &["message_name"], *REGISTRY, ).unwrap(); - pub static ref WORKFLOW_DISPATCHED: IntGaugeVec = register_int_gauge_vec_with_registry!( + pub static ref WORKFLOW_DISPATCHED: IntCounterVec = register_int_counter_vec_with_registry!( "chirp_workflow_workflow_dispatched", "Total dispatched workflows.", &["workflow_name"], diff --git a/packages/infra/client/config/src/runner_protocol.rs b/packages/infra/client/config/src/runner_protocol.rs index abad1d927d..a3c7850139 100644 --- a/packages/infra/client/config/src/runner_protocol.rs +++ b/packages/infra/client/config/src/runner_protocol.rs @@ -11,4 +11,6 @@ pub enum ToRunner { signal: i32, persist_state: bool, }, + // Kills the runner process + Terminate, } diff --git a/packages/infra/client/isolate-v8-runner/src/main.rs b/packages/infra/client/isolate-v8-runner/src/main.rs index 14d5cc686b..034881453f 100644 --- a/packages/infra/client/isolate-v8-runner/src/main.rs +++ b/packages/infra/client/isolate-v8-runner/src/main.rs @@ -94,7 +94,9 @@ async fn main() -> Result<()> { }; // Write exit code - if res.is_err() { + if let Err(err) = &res { + tracing::error!(?err); + fs::write(working_path.join("exit-code"), 1.to_string().as_bytes()).await?; } @@ -197,6 +199,7 @@ async fn handle_connection( tracing::warn!("Actor {actor_id} not found for stopping"); } } + runner_protocol::ToRunner::Terminate => bail!("Received terminate"), } } } diff --git a/packages/infra/client/manager/src/actor/mod.rs b/packages/infra/client/manager/src/actor/mod.rs index 89dba6a9e8..19be27ce8a 100644 --- a/packages/infra/client/manager/src/actor/mod.rs +++ b/packages/infra/client/manager/src/actor/mod.rs @@ -63,7 +63,7 @@ impl Actor { // Write actor to DB let config_json = serde_json::to_vec(&self.config)?; - utils::query(|| async { + utils::sql::query(|| async { // NOTE: On conflict here in case this query runs but the command is not acknowledged sqlx::query(indoc!( " @@ -201,7 +201,7 @@ impl Actor { } // Update DB - utils::query(|| async { + utils::sql::query(|| async { sqlx::query(indoc!( " UPDATE actors @@ -356,7 +356,7 @@ impl Actor { // Update stop_ts if matches!(signal, Signal::SIGTERM | Signal::SIGKILL) || !has_runner { - let stop_ts_set = utils::query(|| async { + let stop_ts_set = utils::sql::query(|| async { sqlx::query_as::<_, (bool,)>(indoc!( " UPDATE actors @@ -398,7 +398,7 @@ impl Actor { } // Update DB - utils::query(|| async { + utils::sql::query(|| async { sqlx::query(indoc!( " UPDATE actors @@ -417,7 +417,7 @@ impl Actor { .await?; // Unbind ports - utils::query(|| async { + utils::sql::query(|| async { sqlx::query(indoc!( " UPDATE actor_ports diff --git a/packages/infra/client/manager/src/actor/setup.rs b/packages/infra/client/manager/src/actor/setup.rs index e0305b2db1..43526f9fc2 100644 --- a/packages/infra/client/manager/src/actor/setup.rs +++ b/packages/infra/client/manager/src/actor/setup.rs @@ -808,7 +808,7 @@ async fn bind_ports_inner( let udp_offset = rand::thread_rng().gen_range(0..truncated_max); // Selects available TCP and UDP ports - let rows = utils::query(|| async { + let rows = utils::sql::query(|| async { sqlx::query_as::<_, (i64, i64)>(indoc!( " INSERT INTO actor_ports (actor_id, port, protocol) diff --git a/packages/infra/client/manager/src/ctx.rs b/packages/infra/client/manager/src/ctx.rs index e2c025f74f..a819c7b726 100644 --- a/packages/infra/client/manager/src/ctx.rs +++ b/packages/infra/client/manager/src/ctx.rs @@ -15,14 +15,22 @@ use futures_util::{ use indoc::indoc; use nix::unistd::Pid; use pegboard::{protocol, system_info::SystemInfo}; -use pegboard_config::{isolate_runner::Config as IsolateRunnerConfig, Client, Config}; +use pegboard_config::{ + isolate_runner::Config as IsolateRunnerConfig, runner_protocol, Client, Config, +}; use sqlx::{pool::PoolConnection, Sqlite, SqlitePool}; use tokio::{ fs, net::{TcpListener, TcpStream}, sync::{Mutex, RwLock}, }; -use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::{ + tungstenite::protocol::{ + frame::{coding::CloseCode, CloseFrame}, + Message, + }, + MaybeTlsStream, WebSocketStream, +}; use url::Url; use uuid::Uuid; @@ -120,7 +128,7 @@ impl Ctx { let event_json = serde_json::to_vec(event)?; // Fetch next idx - let index = utils::query(|| async { + let index = utils::sql::query(|| async { let mut conn = self.sql().await?; let mut txn = conn.begin_immediate().await?; @@ -184,15 +192,30 @@ impl Ctx { loop { match listener.accept().await { Ok((stream, _)) => { - let mut socket = tokio_tungstenite::accept_async(stream).await?; + let mut ws_stream = tokio_tungstenite::accept_async(stream).await?; tracing::info!("received new socket"); if let Some(runner) = &*self2.isolate_runner.read().await { - runner.attach_socket(socket).await?; + runner.attach_socket(ws_stream).await?; } else { - socket.close(None).await?; - bail!("no isolate runner to attach socket to"); + // HACK(RVT-4456): Until we find out where unknown runners (runners that the + // manager does not know about) come from, we just kill them + tracing::error!("killing unknown runner"); + + metrics::UNKNOWN_ISOLATE_RUNNER.with_label_values(&[]).inc(); + + ws_stream + .send(Message::Binary(serde_json::to_vec( + &runner_protocol::ToRunner::Terminate, + )?)) + .await?; + + let close_frame = CloseFrame { + code: CloseCode::Error, + reason: "unknown runner".into(), + }; + ws_stream.send(Message::Close(Some(close_frame))).await?; } } Err(err) => tracing::error!(?err, "failed to connect websocket"), @@ -216,7 +239,7 @@ impl Ctx { // Send init packet { - let (last_command_idx,) = utils::query(|| async { + let (last_command_idx,) = utils::sql::query(|| async { sqlx::query_as::<_, (i64,)>(indoc!( " SELECT last_command_idx FROM state @@ -323,7 +346,7 @@ impl Ctx { // Ack command tokio::try_join!( - utils::query(|| async { + utils::sql::query(|| async { sqlx::query(indoc!( " UPDATE state @@ -334,7 +357,7 @@ impl Ctx { .execute(&mut *self.sql().await?) .await }), - utils::query(|| async { + utils::sql::query(|| async { sqlx::query(indoc!( " INSERT INTO commands ( @@ -360,7 +383,7 @@ impl Ctx { /// Sends all events after the given idx. async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> { // Fetch all missed events - let events = utils::query(|| async { + let events = utils::sql::query(|| async { sqlx::query_as::<_, (i64, Vec)>(indoc!( " SELECT idx, payload @@ -394,7 +417,7 @@ impl Ctx { let ((last_event_idx,), actor_rows) = tokio::try_join!( // There should not be any database operations going on at this point so it is safe to read this // value - utils::query(|| async { + utils::sql::query(|| async { sqlx::query_as::<_, (i64,)>(indoc!( " SELECT last_event_idx FROM state @@ -403,7 +426,7 @@ impl Ctx { .fetch_one(&mut *self.sql().await?) .await }), - utils::query(|| async { + utils::sql::query(|| async { sqlx::query_as::<_, ActorRow>(indoc!( " SELECT actor_id, config, pid, stop_ts @@ -426,7 +449,7 @@ impl Ctx { if row.pid.is_none() && row.stop_ts.is_none() { tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping"); - utils::query(|| async { + utils::sql::query(|| async { sqlx::query(indoc!( " UPDATE actors @@ -538,7 +561,7 @@ impl Ctx { self.observe_isolate_runner(&runner); // Save runner pid - utils::query(|| async { + utils::sql::query(|| async { sqlx::query(indoc!( " UPDATE state @@ -580,7 +603,7 @@ impl Ctx { *guard = None; // Update db state - let res = utils::query(|| async { + let res = utils::sql::query(|| async { sqlx::query(indoc!( " UPDATE state @@ -601,7 +624,7 @@ impl Ctx { /// Fetches isolate runner state from the db. Should be called before the manager's runner websocket opens. async fn rebuild_isolate_runner(self: &Arc) -> Result<()> { - let (isolate_runner_pid,) = utils::query(|| async { + let (isolate_runner_pid,) = utils::sql::query(|| async { sqlx::query_as::<_, (Option,)>(indoc!( " SELECT isolate_runner_pid diff --git a/packages/infra/client/manager/src/metrics/mod.rs b/packages/infra/client/manager/src/metrics/mod.rs index 618c44b3eb..8d2f628155 100644 --- a/packages/infra/client/manager/src/metrics/mod.rs +++ b/packages/infra/client/manager/src/metrics/mod.rs @@ -10,16 +10,37 @@ pub use server::run_standalone; lazy_static::lazy_static! { pub static ref PACKET_RECV_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( - "pegboard_packet_recv_total", + "packet_recv_total", "Total number of packets received.", &[], *REGISTRY, ).unwrap(); pub static ref PACKET_SEND_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( - "pegboard_packet_send_total", + "packet_send_total", "Total number of packets sent.", &[], *REGISTRY, ).unwrap(); + + pub static ref UNKNOWN_ISOLATE_RUNNER: IntCounterVec = register_int_counter_vec_with_registry!( + "unknown_isolate_runner", + "Total number of unknown isolate runners that were found and killed.", + &[], + *REGISTRY, + ).unwrap(); + + pub static ref DUPLICATE_ISOLATE_RUNNER: IntCounterVec = register_int_counter_vec_with_registry!( + "duplicate_isolate_runner", + "Total number of duplicate isolate runners that were found and killed.", + &[], + *REGISTRY, + ).unwrap(); + + pub static ref SQL_ERROR: IntCounterVec = register_int_counter_vec_with_registry!( + "sql_error", + "An SQL error occurred.", + &["error"], + *REGISTRY, + ).unwrap(); } diff --git a/packages/infra/client/manager/src/runner.rs b/packages/infra/client/manager/src/runner.rs index 08210f1a62..a5e5ed2589 100644 --- a/packages/infra/client/manager/src/runner.rs +++ b/packages/infra/client/manager/src/runner.rs @@ -22,9 +22,15 @@ use nix::{ }; use pegboard_config::runner_protocol; use tokio::{fs, net::TcpStream, sync::Mutex}; -use tokio_tungstenite::{tungstenite::protocol::Message, WebSocketStream}; +use tokio_tungstenite::{ + tungstenite::protocol::{ + frame::{coding::CloseCode, CloseFrame}, + Message, + }, + WebSocketStream, +}; -use crate::utils; +use crate::{metrics, utils}; /// How often to check that a PID is still running when observing actor state. const PID_POLL_INTERVAL: Duration = Duration::from_millis(1000); @@ -55,7 +61,7 @@ impl Handle { } } - pub async fn attach_socket(&self, ws_stream: WebSocketStream) -> Result<()> { + pub async fn attach_socket(&self, mut ws_stream: WebSocketStream) -> Result<()> { match &self.comms { Comms::Basic => bail!("attempt to attach socket to basic runner"), Comms::Socket(tx) => { @@ -63,50 +69,66 @@ impl Handle { let mut guard = tx.lock().await; - if guard.is_some() { - tracing::warn!(pid=?self.pid, "runner received another socket"); - } - - let (ws_tx, mut ws_rx) = ws_stream.split(); + if guard.is_none() { + let (ws_tx, mut ws_rx) = ws_stream.split(); + + *guard = Some(ws_tx); + + // Spawn a new thread to handle incoming messages + let self2 = self.clone(); + tokio::task::spawn(async move { + let kill = loop { + match tokio::time::timeout(PING_TIMEOUT, ws_rx.next()).await { + Ok(msg) => match msg { + Some(Ok(Message::Ping(_))) => {} + Some(Ok(Message::Close(_))) | None => { + tracing::debug!(pid=?self2.pid, "runner socket closed"); + break false; + } + Some(Ok(msg)) => { + tracing::warn!(pid=?self2.pid, ?msg, "unexpected message in runner socket") + } + Some(Err(err)) => { + tracing::error!(pid=?self2.pid, ?err, "runner socket error"); + break true; + } + }, + Err(_) => { + tracing::error!(pid=?self2.pid, "socket timed out, killing runner"); - *guard = Some(ws_tx); - - // Spawn a new thread to handle incoming messages - let self2 = self.clone(); - tokio::task::spawn(async move { - let kill = loop { - match tokio::time::timeout(PING_TIMEOUT, ws_rx.next()).await { - Ok(msg) => match msg { - Some(Ok(Message::Ping(_))) => {} - Some(Ok(Message::Close(_))) | None => { - tracing::debug!(pid=?self2.pid, "runner socket closed"); - break false; - } - Some(Ok(msg)) => { - tracing::warn!(pid=?self2.pid, ?msg, "unexpected message in runner socket") - } - Some(Err(err)) => { - tracing::error!(pid=?self2.pid, ?err, "runner socket error"); break true; } - }, - Err(_) => { - tracing::error!(pid=?self2.pid, "socket timed out, killing runner"); + } + }; - break true; + if kill { + if let Err(err) = self2.signal(Signal::SIGKILL) { + // TODO: This should hard error the manager? + tracing::error!(pid=?self2.pid, %err, "failed to kill runner"); } } - }; + }); - if kill { - if let Err(err) = self2.signal(Signal::SIGKILL) { - // TODO: This should hard error the manager? - tracing::error!(pid=?self2.pid, %err, "failed to kill runner"); - } - } - }); + tracing::info!(pid=?self.pid, "socket attached"); + } else { + tracing::warn!(pid=?self.pid, "runner received another socket, terminating new one"); - tracing::info!(pid=?self.pid, "socket attached"); + metrics::DUPLICATE_ISOLATE_RUNNER + .with_label_values(&[]) + .inc(); + + ws_stream + .send(Message::Binary(serde_json::to_vec( + &runner_protocol::ToRunner::Terminate, + )?)) + .await?; + + let close_frame = CloseFrame { + code: CloseCode::Error, + reason: "unknown runner".into(), + }; + ws_stream.send(Message::Close(Some(close_frame))).await?; + } } } diff --git a/packages/infra/client/manager/src/utils/mod.rs b/packages/infra/client/manager/src/utils/mod.rs index 2504476fdb..b78a36bf14 100644 --- a/packages/infra/client/manager/src/utils/mod.rs +++ b/packages/infra/client/manager/src/utils/mod.rs @@ -26,10 +26,6 @@ use pegboard_config::{Addresses, Config}; pub mod sql; -const MAX_QUERY_RETRIES: usize = 16; -const QUERY_RETRY: Duration = Duration::from_millis(500); -const TXN_RETRY: Duration = Duration::from_millis(250); - pub async fn init_dir(config: &Config) -> Result<()> { let data_dir = config.client.data_dir(); @@ -297,50 +293,6 @@ pub async fn fetch_pull_addresses(config: &Config) -> Result> { Ok(addresses) } -/// Executes queries and explicitly handles retry errors. -pub async fn query<'a, F, Fut, T>(mut cb: F) -> Result -where - F: FnMut() -> Fut, - Fut: std::future::Future> + 'a, - T: 'a, -{ - let mut i = 0; - - loop { - match cb().await { - std::result::Result::Ok(x) => return Ok(x), - std::result::Result::Err(err) => { - use sqlx::Error::*; - - if i > MAX_QUERY_RETRIES { - bail!("max sql retries: {err:?}"); - } - i += 1; - - match &err { - // Retry transaction errors immediately - Database(db_err) - if db_err - .message() - .contains("TransactionRetryWithProtoRefreshError") => - { - tracing::info!(message=%db_err.message(), "transaction retry"); - tokio::time::sleep(TXN_RETRY).await; - } - // Retry internal errors with a backoff - Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed - | WorkerCrashed => { - tracing::info!(?err, "query retry"); - tokio::time::sleep(QUERY_RETRY).await; - } - // Throw error - _ => return Err(err.into()), - } - } - } - } -} - pub fn now() -> i64 { time::SystemTime::now() .duration_since(time::UNIX_EPOCH) diff --git a/packages/infra/client/manager/src/utils/sql.rs b/packages/infra/client/manager/src/utils/sql.rs index f377ec1433..e06dceca51 100644 --- a/packages/infra/client/manager/src/utils/sql.rs +++ b/packages/infra/client/manager/src/utils/sql.rs @@ -1,11 +1,20 @@ use std::{ future::Future, ops::{Deref, DerefMut}, + result::Result::{Err, Ok}, + time::Duration, }; +use anyhow::*; use sqlx::sqlite::SqliteQueryResult; use sqlx::{Executor, SqliteConnection}; +use crate::metrics; + +const MAX_QUERY_RETRIES: usize = 16; +const QUERY_RETRY: Duration = Duration::from_millis(500); +const TXN_RETRY: Duration = Duration::from_millis(250); + pub(crate) trait SqliteConnectionExt { fn begin_immediate(&mut self) -> impl Future>; } @@ -65,3 +74,51 @@ impl<'c> DerefMut for Transaction<'c> { self.conn } } + +/// Executes queries and explicitly handles retry errors. +pub async fn query<'a, F, Fut, T>(mut cb: F) -> Result +where + F: FnMut() -> Fut, + Fut: std::future::Future> + 'a, + T: 'a, +{ + let mut i = 0; + + loop { + match cb().await { + std::result::Result::Ok(x) => return Ok(x), + std::result::Result::Err(err) => { + use sqlx::Error::*; + + metrics::SQL_ERROR + .with_label_values(&[&err.to_string()]) + .inc(); + + if i > MAX_QUERY_RETRIES { + bail!("max sql retries: {err:?}"); + } + i += 1; + + match &err { + // Retry transaction errors immediately + Database(db_err) + if db_err + .message() + .contains("TransactionRetryWithProtoRefreshError") => + { + tracing::info!(message=%db_err.message(), "transaction retry"); + tokio::time::sleep(TXN_RETRY).await; + } + // Retry internal errors with a backoff + Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed + | WorkerCrashed => { + tracing::info!(?err, "query retry"); + tokio::time::sleep(QUERY_RETRY).await; + } + // Throw error + _ => return Err(err.into()), + } + } + } + } +} diff --git a/packages/infra/client/manager/tests/common.rs b/packages/infra/client/manager/tests/common.rs index 7e2a20b8be..da386ae46d 100644 --- a/packages/infra/client/manager/tests/common.rs +++ b/packages/infra/client/manager/tests/common.rs @@ -126,7 +126,9 @@ pub async fn start_echo_actor( cluster: protocol::ActorMetadataCluster { cluster_id: Uuid::nil(), }, - build: protocol::ActorMetadataBuild { id: Uuid::nil() }, + build: protocol::ActorMetadataBuild { + build_id: Uuid::nil(), + }, }) .unwrap(), }), @@ -194,7 +196,9 @@ pub async fn start_js_echo_actor( cluster: protocol::ActorMetadataCluster { cluster_id: Uuid::nil(), }, - build: protocol::ActorMetadataBuild { id: Uuid::nil() }, + build: protocol::ActorMetadataBuild { + build_id: Uuid::nil(), + }, }) .unwrap(), }), @@ -270,14 +274,14 @@ pub async fn init_client(gen_path: &Path, working_path: &Path) -> Config { }, images: Images { // Should match the URL in `serve_binaries` - pull_addresses: Addresses::Static(vec![format!( + pull_addresses: Some(Addresses::Static(vec![format!( "http://127.0.0.1:{ARTIFACTS_PORT}" - )]), + )])), }, network: Network { bind_ip: "127.0.0.1".parse().unwrap(), - lan_ip: "127.0.0.1".parse().unwrap(), - wan_ip: "127.0.0.1".parse().unwrap(), + lan_hostname: "127.0.0.1".into(), + wan_hostname: "127.0.0.1".into(), lan_port_range_min: None, lan_port_range_max: None, wan_port_range_min: None, @@ -431,24 +435,24 @@ async fn build_runner(gen_path: &Path, variant: &str) { let image_name = format!("pegboard-{variant}-runner"); // Build runner binary - // let status = Command::new("docker") - // .arg("build") - // .arg("--platform") - // .arg("linux/amd64") - // .arg("-t") - // .arg(&image_name) - // .arg("-f") - // .arg( - // pkg_path - // .join(format!("{variant}-runner")) - // .join("Dockerfile"), - // ) - // .arg(pkg_path.join("..").join("..").join("..")) - // .status() - // .await - // .unwrap(); - - // assert!(status.success()); + let status = Command::new("docker") + .arg("build") + .arg("--platform") + .arg("linux/amd64") + .arg("-t") + .arg(&image_name) + .arg("-f") + .arg( + pkg_path + .join(format!("{variant}-runner")) + .join("Dockerfile"), + ) + .arg(pkg_path.join("..").join("..").join("..")) + .status() + .await + .unwrap(); + + assert!(status.success()); tracing::info!("copying runner image"); diff --git a/packages/services/cluster/src/workflows/server/install/install_scripts/mod.rs b/packages/services/cluster/src/workflows/server/install/install_scripts/mod.rs index cc2a750984..a5c9e0baea 100644 --- a/packages/services/cluster/src/workflows/server/install/install_scripts/mod.rs +++ b/packages/services/cluster/src/workflows/server/install/install_scripts/mod.rs @@ -146,7 +146,8 @@ pub async fn gen_initialize( prometheus_targets.insert( "pegboard".into(), components::vector::PrometheusTarget { - endpoint: "http://127.0.0.1:6000".into(), + // Should match port from pb manager config + endpoint: "http://127.0.0.1:6090".into(), scrape_interval: 15, }, ); @@ -160,7 +161,8 @@ pub async fn gen_initialize( prometheus_targets.insert( "pegboard".into(), components::vector::PrometheusTarget { - endpoint: "http://127.0.0.1:6000".into(), + // Should match port from pb manager config + endpoint: "http://127.0.0.1:6090".into(), scrape_interval: 15, }, );