Skip to content

Commit

Permalink
fix: rebuild isolate runner handle before starting runner socket
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Nov 18, 2024
1 parent a27ea00 commit be2641f
Showing 1 changed file with 144 additions and 135 deletions.
279 changes: 144 additions & 135 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ impl Ctx {
}
});

// Rebuild isolate runner from db
self.rebuild_isolate_runner().await?;

// Start runner socket
let self2 = self.clone();
let runner_socket: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
Expand Down Expand Up @@ -340,7 +343,137 @@ impl Ctx {
Ok(())
}

// Should not be called before `rebuild`.
/// Sends all events after the given idx.
async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> {
// Fetch all missed events
let events = utils::query(|| async {
sqlx::query_as::<_, (i64, Vec<u8>)>(indoc!(
"
SELECT idx, payload
FROM events
WHERE idx > ?1
",
))
.bind(last_event_idx)
.fetch_all(&mut *self.sql().await?)
.await
})
.await?
.into_iter()
.map(|(index, payload)| {
Ok(protocol::EventWrapper {
index,
inner: protocol::Raw::from_string(String::from_utf8_lossy(&payload).into())?,
})
})
.collect::<Result<Vec<_>>>()?;

if events.is_empty() {
return Ok(());
}

self.send_packet(protocol::ToServer::Events(events)).await
}

/// Rebuilds state from DB upon restart.
async fn rebuild(self: &Arc<Self>) -> Result<()> {
let actor_rows = utils::query(|| async {
sqlx::query_as::<_, ActorRow>(indoc!(
"
SELECT actor_id, config, pid, stop_ts
FROM actors
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
.await
})
.await?;

let isolate_runner = { self.isolate_runner.read().await.clone() };

// NOTE: Sqlite doesn't support arrays, can't parallelize this easily
// Emit stop events
for row in &actor_rows {
if row.pid.is_none() && row.stop_ts.is_none() {
tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping");

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE actors
SET stop_ts = ?2
WHERE actor_id = ?1
",
))
.bind(row.actor_id)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.await
})
.await?;

self.event(protocol::Event::ActorStateUpdate {
actor_id: row.actor_id,
state: protocol::ActorState::Stopped,
})
.await?;
}
}

// Start actor observers
let mut actors_guard = self.actors.write().await;
for row in actor_rows {
let Some(pid) = row.pid else {
continue;
};

let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;

let runner = match &isolate_runner {
// We have to clone the existing isolate runner handle instead of creating a new one so it
// becomes a shared reference
Some(isolate_runner) if pid == isolate_runner.pid().as_raw() => {
isolate_runner.clone()
}
_ => match config.image.kind {
protocol::ImageKind::DockerImage | protocol::ImageKind::OciBundle => {
runner::Handle::from_pid(
runner::Comms::Basic,
Pid::from_raw(pid),
self.actor_path(row.actor_id),
)
}
protocol::ImageKind::JavaScript => runner::Handle::from_pid(
runner::Comms::socket(),
Pid::from_raw(pid),
self.actor_path(row.actor_id),
),
},
};

let actor = Actor::with_runner(row.actor_id, config, runner);
let actor = actors_guard.entry(row.actor_id).or_insert(actor);

let actor = actor.clone();
let self2 = self.clone();
tokio::spawn(async move {
use std::result::Result::Err;

if let Err(err) = actor.observe(&self2).await {
tracing::error!(actor_id=?row.actor_id, ?err, "observe failed");
}

// Cleanup afterwards
actor.cleanup(&self2).await
});
}

Ok(())
}
}

impl Ctx {
pub(crate) async fn get_or_spawn_isolate_runner(self: &Arc<Self>) -> Result<runner::Handle> {
let mut guard = self.isolate_runner.write().await;

Expand Down Expand Up @@ -435,67 +568,22 @@ impl Ctx {
});
}

/// Sends all events after the given idx.
async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> {
// Fetch all missed events
let events = utils::query(|| async {
sqlx::query_as::<_, (i64, Vec<u8>)>(indoc!(
/// Fetches isolate runner state from the db. Should be called before the manager's runner websocket opens.
async fn rebuild_isolate_runner(self: &Arc<Self>) -> Result<()> {
let (isolate_runner_pid,) = utils::query(|| async {
sqlx::query_as::<_, (Option<i32>,)>(indoc!(
"
SELECT idx, payload
FROM events
WHERE idx > ?1
SELECT isolate_runner_pid
FROM state
",
))
.bind(last_event_idx)
.fetch_all(&mut *self.sql().await?)
.fetch_one(&mut *self.sql().await?)
.await
})
.await?
.into_iter()
.map(|(index, payload)| {
Ok(protocol::EventWrapper {
index,
inner: protocol::Raw::from_string(String::from_utf8_lossy(&payload).into())?,
})
})
.collect::<Result<Vec<_>>>()?;

if events.is_empty() {
return Ok(());
}

self.send_packet(protocol::ToServer::Events(events)).await
}

/// Rebuilds state from DB upon restart.
async fn rebuild(self: &Arc<Self>) -> Result<()> {
let (actor_rows, isolate_runner_pid) = tokio::try_join!(
utils::query(|| async {
sqlx::query_as::<_, ActorRow>(indoc!(
"
SELECT actor_id, config, pid, stop_ts
FROM actors
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
.await
}),
utils::query(|| async {
sqlx::query_as::<_, (Option<i32>,)>(indoc!(
"
SELECT isolate_runner_pid
FROM state
",
))
.fetch_one(&mut *self.sql().await?)
.await
.map(|x| x.0)
}),
)?;
.await?;

// Recreate isolate runner handle
let isolate_runner = if let Some(isolate_runner_pid) = isolate_runner_pid {
if let Some(isolate_runner_pid) = isolate_runner_pid {
let mut guard = self.isolate_runner.write().await;

tracing::info!(?isolate_runner_pid, "found old isolate runner");
Expand All @@ -507,86 +595,7 @@ impl Ctx {
);
self.observe_isolate_runner(&runner);

*guard = Some(runner.clone());

Some(runner)
} else {
None
};

// NOTE: Sqlite doesn't support arrays, can't parallelize this easily
// Emit stop events
for row in &actor_rows {
if row.pid.is_none() && row.stop_ts.is_none() {
tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping");

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE actors
SET stop_ts = ?2
WHERE actor_id = ?1
",
))
.bind(row.actor_id)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.await
})
.await?;

self.event(protocol::Event::ActorStateUpdate {
actor_id: row.actor_id,
state: protocol::ActorState::Stopped,
})
.await?;
}
}

// Start actor observers
let mut actors_guard = self.actors.write().await;
for row in actor_rows {
let Some(pid) = row.pid else {
continue;
};

let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;

// Clone isolate runner handle
let runner = if Some(pid) == isolate_runner_pid {
isolate_runner.clone().expect("isolate runner must exist")
} else {
match config.image.kind {
protocol::ImageKind::DockerImage | protocol::ImageKind::OciBundle => {
runner::Handle::from_pid(
runner::Comms::Basic,
Pid::from_raw(pid),
self.actor_path(row.actor_id),
)
}
protocol::ImageKind::JavaScript => runner::Handle::from_pid(
runner::Comms::socket(),
Pid::from_raw(pid),
self.actor_path(row.actor_id),
),
}
};

let actor = Actor::with_runner(row.actor_id, config, runner);
let actor = actors_guard.entry(row.actor_id).or_insert(actor);

let actor = actor.clone();
let self2 = self.clone();
tokio::spawn(async move {
use std::result::Result::Err;

if let Err(err) = actor.observe(&self2).await {
tracing::error!(actor_id=?row.actor_id, ?err, "observe failed");
}

// Cleanup afterwards
actor.cleanup(&self2).await
});
*guard = Some(runner);
}

Ok(())
Expand Down

0 comments on commit be2641f

Please sign in to comment.