From be2641f2421abde2bdb43929af6c482781390b85 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 18 Nov 2024 22:59:11 +0000 Subject: [PATCH] fix: rebuild isolate runner handle before starting runner socket --- packages/infra/client/manager/src/ctx.rs | 279 ++++++++++++----------- 1 file changed, 144 insertions(+), 135 deletions(-) diff --git a/packages/infra/client/manager/src/ctx.rs b/packages/infra/client/manager/src/ctx.rs index 6504145e7f..ae7d5d2870 100644 --- a/packages/infra/client/manager/src/ctx.rs +++ b/packages/infra/client/manager/src/ctx.rs @@ -172,6 +172,9 @@ impl Ctx { } }); + // Rebuild isolate runner from db + self.rebuild_isolate_runner().await?; + // Start runner socket let self2 = self.clone(); let runner_socket: tokio::task::JoinHandle> = tokio::spawn(async move { @@ -340,7 +343,137 @@ impl Ctx { Ok(()) } - // Should not be called before `rebuild`. + /// Sends all events after the given idx. + async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> { + // Fetch all missed events + let events = utils::query(|| async { + sqlx::query_as::<_, (i64, Vec)>(indoc!( + " + SELECT idx, payload + FROM events + WHERE idx > ?1 + ", + )) + .bind(last_event_idx) + .fetch_all(&mut *self.sql().await?) + .await + }) + .await? + .into_iter() + .map(|(index, payload)| { + Ok(protocol::EventWrapper { + index, + inner: protocol::Raw::from_string(String::from_utf8_lossy(&payload).into())?, + }) + }) + .collect::>>()?; + + if events.is_empty() { + return Ok(()); + } + + self.send_packet(protocol::ToServer::Events(events)).await + } + + /// Rebuilds state from DB upon restart. + async fn rebuild(self: &Arc) -> Result<()> { + let actor_rows = utils::query(|| async { + sqlx::query_as::<_, ActorRow>(indoc!( + " + SELECT actor_id, config, pid, stop_ts + FROM actors + WHERE exit_ts IS NULL + ", + )) + .fetch_all(&mut *self.sql().await?) + .await + }) + .await?; + + let isolate_runner = { self.isolate_runner.read().await.clone() }; + + // NOTE: Sqlite doesn't support arrays, can't parallelize this easily + // Emit stop events + for row in &actor_rows { + if row.pid.is_none() && row.stop_ts.is_none() { + tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping"); + + utils::query(|| async { + sqlx::query(indoc!( + " + UPDATE actors + SET stop_ts = ?2 + WHERE actor_id = ?1 + ", + )) + .bind(row.actor_id) + .bind(utils::now()) + .execute(&mut *self.sql().await?) + .await + }) + .await?; + + self.event(protocol::Event::ActorStateUpdate { + actor_id: row.actor_id, + state: protocol::ActorState::Stopped, + }) + .await?; + } + } + + // Start actor observers + let mut actors_guard = self.actors.write().await; + for row in actor_rows { + let Some(pid) = row.pid else { + continue; + }; + + let config = serde_json::from_slice::(&row.config)?; + + let runner = match &isolate_runner { + // We have to clone the existing isolate runner handle instead of creating a new one so it + // becomes a shared reference + Some(isolate_runner) if pid == isolate_runner.pid().as_raw() => { + isolate_runner.clone() + } + _ => match config.image.kind { + protocol::ImageKind::DockerImage | protocol::ImageKind::OciBundle => { + runner::Handle::from_pid( + runner::Comms::Basic, + Pid::from_raw(pid), + self.actor_path(row.actor_id), + ) + } + protocol::ImageKind::JavaScript => runner::Handle::from_pid( + runner::Comms::socket(), + Pid::from_raw(pid), + self.actor_path(row.actor_id), + ), + }, + }; + + let actor = Actor::with_runner(row.actor_id, config, runner); + let actor = actors_guard.entry(row.actor_id).or_insert(actor); + + let actor = actor.clone(); + let self2 = self.clone(); + tokio::spawn(async move { + use std::result::Result::Err; + + if let Err(err) = actor.observe(&self2).await { + tracing::error!(actor_id=?row.actor_id, ?err, "observe failed"); + } + + // Cleanup afterwards + actor.cleanup(&self2).await + }); + } + + Ok(()) + } +} + +impl Ctx { pub(crate) async fn get_or_spawn_isolate_runner(self: &Arc) -> Result { let mut guard = self.isolate_runner.write().await; @@ -435,67 +568,22 @@ impl Ctx { }); } - /// Sends all events after the given idx. - async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> { - // Fetch all missed events - let events = utils::query(|| async { - sqlx::query_as::<_, (i64, Vec)>(indoc!( + /// Fetches isolate runner state from the db. Should be called before the manager's runner websocket opens. + async fn rebuild_isolate_runner(self: &Arc) -> Result<()> { + let (isolate_runner_pid,) = utils::query(|| async { + sqlx::query_as::<_, (Option,)>(indoc!( " - SELECT idx, payload - FROM events - WHERE idx > ?1 + SELECT isolate_runner_pid + FROM state ", )) - .bind(last_event_idx) - .fetch_all(&mut *self.sql().await?) + .fetch_one(&mut *self.sql().await?) .await }) - .await? - .into_iter() - .map(|(index, payload)| { - Ok(protocol::EventWrapper { - index, - inner: protocol::Raw::from_string(String::from_utf8_lossy(&payload).into())?, - }) - }) - .collect::>>()?; - - if events.is_empty() { - return Ok(()); - } - - self.send_packet(protocol::ToServer::Events(events)).await - } - - /// Rebuilds state from DB upon restart. - async fn rebuild(self: &Arc) -> Result<()> { - let (actor_rows, isolate_runner_pid) = tokio::try_join!( - utils::query(|| async { - sqlx::query_as::<_, ActorRow>(indoc!( - " - SELECT actor_id, config, pid, stop_ts - FROM actors - WHERE exit_ts IS NULL - ", - )) - .fetch_all(&mut *self.sql().await?) - .await - }), - utils::query(|| async { - sqlx::query_as::<_, (Option,)>(indoc!( - " - SELECT isolate_runner_pid - FROM state - ", - )) - .fetch_one(&mut *self.sql().await?) - .await - .map(|x| x.0) - }), - )?; + .await?; // Recreate isolate runner handle - let isolate_runner = if let Some(isolate_runner_pid) = isolate_runner_pid { + if let Some(isolate_runner_pid) = isolate_runner_pid { let mut guard = self.isolate_runner.write().await; tracing::info!(?isolate_runner_pid, "found old isolate runner"); @@ -507,86 +595,7 @@ impl Ctx { ); self.observe_isolate_runner(&runner); - *guard = Some(runner.clone()); - - Some(runner) - } else { - None - }; - - // NOTE: Sqlite doesn't support arrays, can't parallelize this easily - // Emit stop events - for row in &actor_rows { - if row.pid.is_none() && row.stop_ts.is_none() { - tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping"); - - utils::query(|| async { - sqlx::query(indoc!( - " - UPDATE actors - SET stop_ts = ?2 - WHERE actor_id = ?1 - ", - )) - .bind(row.actor_id) - .bind(utils::now()) - .execute(&mut *self.sql().await?) - .await - }) - .await?; - - self.event(protocol::Event::ActorStateUpdate { - actor_id: row.actor_id, - state: protocol::ActorState::Stopped, - }) - .await?; - } - } - - // Start actor observers - let mut actors_guard = self.actors.write().await; - for row in actor_rows { - let Some(pid) = row.pid else { - continue; - }; - - let config = serde_json::from_slice::(&row.config)?; - - // Clone isolate runner handle - let runner = if Some(pid) == isolate_runner_pid { - isolate_runner.clone().expect("isolate runner must exist") - } else { - match config.image.kind { - protocol::ImageKind::DockerImage | protocol::ImageKind::OciBundle => { - runner::Handle::from_pid( - runner::Comms::Basic, - Pid::from_raw(pid), - self.actor_path(row.actor_id), - ) - } - protocol::ImageKind::JavaScript => runner::Handle::from_pid( - runner::Comms::socket(), - Pid::from_raw(pid), - self.actor_path(row.actor_id), - ), - } - }; - - let actor = Actor::with_runner(row.actor_id, config, runner); - let actor = actors_guard.entry(row.actor_id).or_insert(actor); - - let actor = actor.clone(); - let self2 = self.clone(); - tokio::spawn(async move { - use std::result::Result::Err; - - if let Err(err) = actor.observe(&self2).await { - tracing::error!(actor_id=?row.actor_id, ?err, "observe failed"); - } - - // Cleanup afterwards - actor.cleanup(&self2).await - }); + *guard = Some(runner); } Ok(())