Skip to content

Commit

Permalink
fix(pegboard): implement cleanup, rebuild, fix queries (#1150)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Oct 9, 2024
1 parent 166991e commit 2bc957b
Show file tree
Hide file tree
Showing 14 changed files with 796 additions and 440 deletions.
16 changes: 14 additions & 2 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,25 @@ the internal location.
`std::collections::HashMap` does not implement `Hash`. To get around this, use `util::serde::HashableMap`:

```rust
use util::serde::AsHashableExt;

struct Input {
map: HashMap<..., ...>,
}

// ...

ctx
.activity(MyActivityInput {
map: input.map.as_hashable(),
map: input.map.into(),
})
.await?;

// ...

struct MyActivityInput {
map: util::serde::HashableMap<..., ...>,
}

```

## Nested options with serde
Expand Down
4 changes: 3 additions & 1 deletion lib/bolt/core/src/tasks/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ async fn ip_inner(
"root",
"-i",
ssh_key.path(),
ip
"-L",
"9090:10.0.0.84:8080",
ip,
)
.run()
}
Expand Down
18 changes: 11 additions & 7 deletions lib/pegboard/container-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ fn main() -> anyhow::Result<()> {
Some(x) if x == "dynamic_server" => Stakeholder::DynamicServer {
server_id: var("PEGBOARD_META_server_id")?,
},
Some(x) => bail!("invalid container manager: {x}"),
None => bail!("no container manager specified"),
Some(x) => bail!("invalid container stakeholder: {x}"),
None => bail!("no container stakeholder specified"),
};

let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
Expand Down Expand Up @@ -112,8 +112,13 @@ fn run_container(
pegboard_container_dir: &Path,
root_user_enabled: bool,
) -> anyhow::Result<i32> {
let container_id = fs::read_to_string(pegboard_container_dir.join("container-id"))
.context("failed to read container-id")?;
// Extract container id from dir
let container_id = pegboard_container_dir
.iter()
.last()
.context("empty `pegboard_container_dir`")?
.to_string_lossy()
.to_string();
let oci_bundle_path = pegboard_container_dir.join("oci-bundle");
let oci_bundle_config_json = oci_bundle_path.join("config.json");

Expand Down Expand Up @@ -167,14 +172,13 @@ fn run_container(
// This will wait for the child to exit and then exit itself so we have time to ship all of the
// required logs
let mut signals = Signals::new(&[SIGTERM])?;
let runc_container_id = container_id.clone();
thread::spawn(move || {
for _ in signals.forever() {
println!("Received SIGTERM, forwarding to runc container {runc_container_id}");
println!("Received SIGTERM, forwarding to runc container {container_id}");
let status = Command::new("runc")
.arg("kill")
.arg("--all")
.arg(&runc_container_id)
.arg(&container_id)
.arg("SIGTERM")
.status();
println!("runc kill status: {:?}", status);
Expand Down
2 changes: 0 additions & 2 deletions lib/pegboard/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ license = "Apache-2.0"
anyhow = "1.0.79"
futures-util = { version = "0.3" }
indoc = "2.0"
lz4_flex = "0.11"
nix = { version = "0.27", default-features = false, features = ["user", "signal"] }
notify = { version = "6.1.1", default-features = false, features = [ "serde" ] }
reqwest = { version = "0.11", features = ["stream"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
tar = "0.4.41"
tokio = { version = "1.27", default-features = false, features = ["fs", "process", "macros", "rt", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false, features = ["io-util"] }
tracing = "0.1"
Expand Down
134 changes: 74 additions & 60 deletions lib/pegboard/manager/src/container/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
time::Duration,
};

use anyhow::*;
Expand All @@ -23,8 +23,8 @@ mod setup;

/// How often to check for a PID when one is not present and a stop command was received.
const STOP_PID_INTERVAL: Duration = std::time::Duration::from_millis(250);
/// How long to wait until no longer waiting for a PID when a stop command was received.
const STOP_PID_TIMEOUT: Duration = std::time::Duration::from_secs(30);
/// How many times to check for a PID when a stop command was received.
const STOP_PID_RETRIES: usize = 32;
/// How often to check that a PID is still running when observing container state.
const PID_POLL_INTERVAL: Duration = std::time::Duration::from_millis(1000);
const VECTOR_SOCKET_ADDR: &str = "127.0.0.1:5021";
Expand All @@ -38,40 +38,50 @@ enum ObservationState {

pub struct Container {
container_id: Uuid,
config: protocol::ContainerConfig,

pid: Mutex<Option<Pid>>,
}

impl Container {
pub fn new(container_id: Uuid) -> Arc<Self> {
pub fn new(container_id: Uuid, config: protocol::ContainerConfig) -> Arc<Self> {
Arc::new(Container {
container_id,
config,

pid: Mutex::new(None),
})
}

pub async fn start(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "starting container");
pub fn with_pid(container_id: Uuid, config: protocol::ContainerConfig, pid: Pid) -> Arc<Self> {
Arc::new(Container {
container_id,
config,

pid: Mutex::new(Some(pid)),
})
}

pub async fn start(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "starting");

// Write container to DB
let config_json = serde_json::to_vec(&self.config)?;
utils::query(|| async {
// NOTE: On conflict here in case this query runs but the command is not acknowledged
sqlx::query(indoc!(
"
INSERT INTO containers (
container_id,
config,
start_ts
)
VALUES (?1, ?2)
VALUES (?1, ?2, ?3)
ON CONFLICT (container_id) DO NOTHING
",
))
.bind(self.container_id)
.bind(&config_json)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
Expand All @@ -84,75 +94,76 @@ impl Container {
})
.await?;

{
let s = self.clone();
let ctx = ctx.clone();
// Lifecycle
let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
use std::result::Result::{Err, Ok};

tokio::spawn(async move {
if let Err(err) = s.setup(&ctx, config).await {
tracing::error!(?err, "container run failed");
match self2.setup(&ctx2).await {
Ok(container_runner_path) => match self2.run(&ctx2, container_runner_path).await {
Ok(pid) => {
if let Err(err) = self2.observe(&ctx2, pid).await {
tracing::error!(container_id=?self2.container_id, ?err, "observe failed");
}
}
Err(err) => {
tracing::error!(container_id=?self2.container_id, ?err, "run failed")
}
},
Err(err) => tracing::error!(container_id=?self2.container_id, ?err, "setup failed"),
}

// Cleanup
let mut containers = ctx.containers.write().await;
containers.remove(&s.container_id);
}
});
}
// Cleanup afterwards
self2.cleanup(&ctx2).await
});

Ok(())
}

async fn setup(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
async fn setup(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<PathBuf> {
tracing::info!(container_id=?self.container_id, "setting up");

let container_path = ctx.container_path(self.container_id);

// Create container working dir
fs::create_dir(&container_path).await?;

// Download container runner
let container_runner_path = ctx
.fetch_container_runner(&config.container_runner_binary_url)
.fetch_container_runner(&self.config.container_runner_binary_url)
.await?;

setup::cni_bundle(self.container_id, &config, &ctx).await?;
self.setup_oci_bundle(&ctx).await?;

// Run CNI setup script
if let protocol::NetworkMode::Bridge = config.network_mode {
setup::cni_network(self.container_id, &config, &ctx).await?;
if let protocol::NetworkMode::Bridge = self.config.network_mode {
self.setup_cni_network(&ctx).await?;
}

Ok(container_runner_path)
}

async fn run(self: &Arc<Self>, ctx: &Arc<Ctx>, container_runner_path: PathBuf) -> Result<Pid> {
tracing::info!(container_id=?self.container_id, "spawning");

let mut runner_env = vec![
(
"PEGBOARD_META_root_user_enabled",
config.root_user_enabled.to_string(),
self.config.root_user_enabled.to_string(),
),
(
"PEGBOARD_META_vector_socket_addr",
VECTOR_SOCKET_ADDR.to_string(),
),
];
runner_env.extend(config.stakeholder.env());

self.run(ctx, container_runner_path, &runner_env).await?;

Ok(())
}

async fn run(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
container_runner_path: PathBuf,
env: &[(&str, String)],
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "spawning");
runner_env.extend(self.config.stakeholder.env());

// Spawn runner which spawns the container
let pid = setup::spawn_orphaned_container_runner(
container_runner_path,
ctx.container_path(self.container_id),
&env,
&runner_env,
)?;

tracing::info!(container_id=?self.container_id, ?pid, "pid received");
Expand All @@ -168,12 +179,14 @@ impl Container {
"
UPDATE containers
SET
running_ts = ?2
running_ts = ?2 AND
pid = ?3
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.bind(pid.as_raw())
.execute(&mut *ctx.sql().await?)
.await
})
Expand All @@ -187,13 +200,13 @@ impl Container {
})
.await?;

self.observe(ctx, pid).await?;

Ok(())
Ok(pid)
}

// Watch container for updates
async fn observe(&self, ctx: &Arc<Ctx>, pid: Pid) -> Result<()> {
pub(crate) async fn observe(&self, ctx: &Arc<Ctx>, pid: Pid) -> Result<()> {
tracing::info!(container_id=?self.container_id, ?pid, "observing");

let exit_code_path = ctx.container_path(self.container_id).join("exit-code");
let proc_path = Path::new("/proc").join(pid.to_string());

Expand Down Expand Up @@ -288,29 +301,29 @@ impl Container {
})
.await?;

tracing::info!(container_id=?self.container_id, "container complete");
tracing::info!(container_id=?self.container_id, "complete");

Ok(())
}

pub async fn stop(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "stopping");

let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
if let Err(err) = self2.stop_inner(&ctx2).await {
tracing::error!(?err, "container stop failed");
}

// Cleanup regardless
let mut containers = ctx2.containers.write().await;
containers.remove(&self2.container_id);
self2.cleanup(&ctx2).await
});

Ok(())
}

async fn stop_inner(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
let now = Instant::now();
let mut i = 0;

let pid = loop {
if let Some(pid) = *self.pid.lock().await {
Expand All @@ -319,14 +332,15 @@ impl Container {

tracing::warn!(container_id=?self.container_id, "waiting for pid to stop workflow");

if now.elapsed() > STOP_PID_TIMEOUT {
if i > STOP_PID_RETRIES {
tracing::error!(
container_id=?self.container_id,
"timed out waiting for container to get PID, considering container stopped",
);

break None;
}
i += 1;

tokio::time::sleep(STOP_PID_INTERVAL).await;
};
Expand All @@ -353,7 +367,7 @@ impl Container {

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Stopping,
state: protocol::ContainerState::Stopped,
})
.await?;

Expand Down
Loading

0 comments on commit 2bc957b

Please sign in to comment.