Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rebuild isolate runner handle before starting runner socket #1376

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 144 additions & 135 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ impl Ctx {
}
});

// Rebuild isolate runner from db
self.rebuild_isolate_runner().await?;

// Start runner socket
let self2 = self.clone();
let runner_socket: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
Expand Down Expand Up @@ -338,7 +341,137 @@ impl Ctx {
Ok(())
}

// Should not be called before `rebuild`.
/// Sends all events after the given idx.
async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> {
// Fetch all missed events
let events = utils::query(|| async {
sqlx::query_as::<_, (i64, Vec<u8>)>(indoc!(
"
SELECT idx, payload
FROM events
WHERE idx > ?1
",
))
.bind(last_event_idx)
.fetch_all(&mut *self.sql().await?)
.await
})
.await?
.into_iter()
.map(|(index, payload)| {
Ok(protocol::EventWrapper {
index,
inner: protocol::Raw::from_string(String::from_utf8_lossy(&payload).into())?,
})
})
.collect::<Result<Vec<_>>>()?;

if events.is_empty() {
return Ok(());
}

self.send_packet(protocol::ToServer::Events(events)).await
}

/// Rebuilds state from DB upon restart.
async fn rebuild(self: &Arc<Self>) -> Result<()> {
let actor_rows = utils::query(|| async {
sqlx::query_as::<_, ActorRow>(indoc!(
"
SELECT actor_id, config, pid, stop_ts
FROM actors
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
.await
})
.await?;

let isolate_runner = { self.isolate_runner.read().await.clone() };

// NOTE: Sqlite doesn't support arrays, can't parallelize this easily
// Emit stop events
for row in &actor_rows {
if row.pid.is_none() && row.stop_ts.is_none() {
tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping");

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE actors
SET stop_ts = ?2
WHERE actor_id = ?1
",
))
.bind(row.actor_id)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.await
})
.await?;

self.event(protocol::Event::ActorStateUpdate {
actor_id: row.actor_id,
state: protocol::ActorState::Stopped,
})
.await?;
}
}

// Start actor observers
let mut actors_guard = self.actors.write().await;
for row in actor_rows {
let Some(pid) = row.pid else {
continue;
};

let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;

let runner = match &isolate_runner {
// We have to clone the existing isolate runner handle instead of creating a new one so it
// becomes a shared reference
Some(isolate_runner) if pid == isolate_runner.pid().as_raw() => {
isolate_runner.clone()
}
_ => match config.image.kind {
protocol::ImageKind::DockerImage | protocol::ImageKind::OciBundle => {
runner::Handle::from_pid(
runner::Comms::Basic,
Pid::from_raw(pid),
self.actor_path(row.actor_id),
)
}
protocol::ImageKind::JavaScript => runner::Handle::from_pid(
runner::Comms::socket(),
Pid::from_raw(pid),
self.actor_path(row.actor_id),
),
},
};

let actor = Actor::with_runner(row.actor_id, config, runner);
let actor = actors_guard.entry(row.actor_id).or_insert(actor);

let actor = actor.clone();
let self2 = self.clone();
tokio::spawn(async move {
use std::result::Result::Err;

if let Err(err) = actor.observe(&self2).await {
tracing::error!(actor_id=?row.actor_id, ?err, "observe failed");
}

// Cleanup afterwards
actor.cleanup(&self2).await
});
}

Ok(())
}
}

impl Ctx {
pub(crate) async fn get_or_spawn_isolate_runner(self: &Arc<Self>) -> Result<runner::Handle> {
let mut guard = self.isolate_runner.write().await;

Expand Down Expand Up @@ -435,67 +568,22 @@ impl Ctx {
});
}

/// Sends all events after the given idx.
async fn rebroadcast(&self, last_event_idx: i64) -> Result<()> {
// Fetch all missed events
let events = utils::query(|| async {
sqlx::query_as::<_, (i64, Vec<u8>)>(indoc!(
/// Fetches isolate runner state from the db. Should be called before the manager's runner websocket opens.
async fn rebuild_isolate_runner(self: &Arc<Self>) -> Result<()> {
let (isolate_runner_pid,) = utils::query(|| async {
sqlx::query_as::<_, (Option<i32>,)>(indoc!(
"
SELECT idx, payload
FROM events
WHERE idx > ?1
SELECT isolate_runner_pid
FROM state
",
))
.bind(last_event_idx)
.fetch_all(&mut *self.sql().await?)
.fetch_one(&mut *self.sql().await?)
.await
})
.await?
.into_iter()
.map(|(index, payload)| {
Ok(protocol::EventWrapper {
index,
inner: protocol::Raw::from_string(String::from_utf8_lossy(&payload).into())?,
})
})
.collect::<Result<Vec<_>>>()?;

if events.is_empty() {
return Ok(());
}

self.send_packet(protocol::ToServer::Events(events)).await
}

/// Rebuilds state from DB upon restart.
async fn rebuild(self: &Arc<Self>) -> Result<()> {
let (actor_rows, isolate_runner_pid) = tokio::try_join!(
utils::query(|| async {
sqlx::query_as::<_, ActorRow>(indoc!(
"
SELECT actor_id, config, pid, stop_ts
FROM actors
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
.await
}),
utils::query(|| async {
sqlx::query_as::<_, (Option<i32>,)>(indoc!(
"
SELECT isolate_runner_pid
FROM state
",
))
.fetch_one(&mut *self.sql().await?)
.await
.map(|x| x.0)
}),
)?;
.await?;

// Recreate isolate runner handle
let isolate_runner = if let Some(isolate_runner_pid) = isolate_runner_pid {
if let Some(isolate_runner_pid) = isolate_runner_pid {
let mut guard = self.isolate_runner.write().await;

tracing::info!(?isolate_runner_pid, "found old isolate runner");
Expand All @@ -507,86 +595,7 @@ impl Ctx {
);
self.observe_isolate_runner(&runner);

*guard = Some(runner.clone());

Some(runner)
} else {
None
};

// NOTE: Sqlite doesn't support arrays, can't parallelize this easily
// Emit stop events
for row in &actor_rows {
if row.pid.is_none() && row.stop_ts.is_none() {
tracing::error!(actor_id=?row.actor_id, "actor has no pid, stopping");

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE actors
SET stop_ts = ?2
WHERE actor_id = ?1
",
))
.bind(row.actor_id)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.await
})
.await?;

self.event(protocol::Event::ActorStateUpdate {
actor_id: row.actor_id,
state: protocol::ActorState::Stopped,
})
.await?;
}
}

// Start actor observers
let mut actors_guard = self.actors.write().await;
for row in actor_rows {
let Some(pid) = row.pid else {
continue;
};

let config = serde_json::from_slice::<protocol::ActorConfig>(&row.config)?;

// Clone isolate runner handle
let runner = if Some(pid) == isolate_runner_pid {
isolate_runner.clone().expect("isolate runner must exist")
} else {
match config.image.kind {
protocol::ImageKind::DockerImage | protocol::ImageKind::OciBundle => {
runner::Handle::from_pid(
runner::Comms::Basic,
Pid::from_raw(pid),
self.actor_path(row.actor_id),
)
}
protocol::ImageKind::JavaScript => runner::Handle::from_pid(
runner::Comms::socket(),
Pid::from_raw(pid),
self.actor_path(row.actor_id),
),
}
};

let actor = Actor::with_runner(row.actor_id, config, runner);
let actor = actors_guard.entry(row.actor_id).or_insert(actor);

let actor = actor.clone();
let self2 = self.clone();
tokio::spawn(async move {
use std::result::Result::Err;

if let Err(err) = actor.observe(&self2).await {
tracing::error!(actor_id=?row.actor_id, ?err, "observe failed");
}

// Cleanup afterwards
actor.cleanup(&self2).await
});
*guard = Some(runner);
}

Ok(())
Expand Down
Loading