Skip to content
This repository has been archived by the owner on Dec 21, 2024. It is now read-only.

Commit

Permalink
feat(toolchain): dispatch backend config update events
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Sep 13, 2024
1 parent 871122d commit 1dea393
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 72 deletions.
12 changes: 12 additions & 0 deletions packages/backend/cli/commands/meta/path.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { z } from "zod";
import { globalOptsSchema, initProject } from "../../common.ts";
import { metaPath } from "../../../toolchain/project/mod.ts";

export const optsSchema = z.object({}).merge(globalOptsSchema);

type Opts = z.infer<typeof optsSchema>;

export async function execute(opts: Opts) {
const project = await initProject(opts);
console.log(metaPath(project));
}
17 changes: 11 additions & 6 deletions packages/backend/cli/execute.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import { z } from "zod";
import { UnreachableError } from "../toolchain/error/mod.ts";

import * as build from "./commands/build.ts";
import * as clean from "./commands/clean.ts";
import * as configShow from "./commands/config/show.ts";
import * as createActor from "./commands/create/actor.ts";
import * as createModule from "./commands/create/module.ts";
import * as createScript from "./commands/create/script.ts";
import * as createTest from "./commands/create/test.ts";
import * as dev from "./commands/dev.ts";
import * as format from "./commands/format.ts";
import * as init from "./commands/init.ts";
import * as lint from "./commands/lint.ts";
import * as test from "./commands/test.ts";
import * as dbInstanceStart from "./commands/db/instance/start.ts";
import * as dbInstanceStatus from "./commands/db/instance/status.ts";
import * as dbInstanceStop from "./commands/db/instance/stop.ts";
Expand All @@ -21,7 +18,12 @@ import * as dbMigratePush from "./commands/db/migrate/push.ts";
import * as dbReset from "./commands/db/reset.ts";
import * as dbSh from "./commands/db/sh.ts";
import * as dbUrl from "./commands/db/url.ts";
import { UnreachableError } from "../toolchain/error/mod.ts";
import * as dev from "./commands/dev.ts";
import * as format from "./commands/format.ts";
import * as init from "./commands/init.ts";
import * as lint from "./commands/lint.ts";
import * as metaPath from "./commands/meta/path.ts";
import * as test from "./commands/test.ts";

export const commandSchema = z.union([
z.object({ build: build.optsSchema }),
Expand All @@ -35,6 +37,7 @@ export const commandSchema = z.union([
z.object({ format: format.optsSchema }),
z.object({ init: init.optsSchema }),
z.object({ lint: lint.optsSchema }),
z.object({ metaPath: metaPath.optsSchema }),
z.object({ test: test.optsSchema }),
z.object({ dbInstanceStart: dbInstanceStart.optsSchema }),
z.object({ dbInstanceStatus: dbInstanceStatus.optsSchema }),
Expand Down Expand Up @@ -71,6 +74,8 @@ export async function executeCommand(command: Command) {
await format.execute(command.format);
} else if ("init" in command) {
await init.execute(command.init);
} else if ("metaPath" in command) {
await metaPath.execute(command.metaPath);
} else if ("lint" in command) {
await lint.execute(command.lint);
} else if ("test" in command) {
Expand Down
232 changes: 178 additions & 54 deletions packages/toolchain/src/tasks/backend_start.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use anyhow::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{path::Path, time::Duration};

use crate::{
backend::{self, build_backend_command_raw},
backend::{self, build_backend_command, build_backend_command_raw},
config::{self, meta},
paths,
util::{
process_manager::{CommandOpts, StartMode, StartOpts},
task,
task::{self, backend_config_update},
},
};

Expand Down Expand Up @@ -45,63 +47,185 @@ impl task::Task for Task {

// Start or hook to backend
let task_inner = task.clone();
let exit_code = backend::PROCESS_MANAGER_DEV
.start(
StartOpts {
task,
start_mode: input.start_mode,
base_data_dir: paths::data_dir()?,
},
|| async move {
// Pick dev port
let backend_port = portpicker::pick_unused_port().context("no free ports")?;
let editor_port = portpicker::pick_unused_port().context("no free ports")?;
meta::mutate_project(&paths::data_dir()?, |x| {
x.backend_port = Some(backend_port);
x.editor_port = Some(editor_port);
})
.await?;
let pm_fut = backend::PROCESS_MANAGER_DEV.start(
StartOpts {
task: task.clone(),
start_mode: input.start_mode,
base_data_dir: paths::data_dir()?,
},
|| async move {
// Pick dev port
let backend_port = portpicker::pick_unused_port().context("no free ports")?;
let editor_port = portpicker::pick_unused_port().context("no free ports")?;
meta::mutate_project(&paths::data_dir()?, |x| {
x.backend_port = Some(backend_port);
x.editor_port = Some(editor_port);
})
.await?;

// Build env
let (mut cmd_env, config_path) =
config::settings::try_read(&paths::data_dir()?, |settings| {
let mut env = settings.backend.command_environment.clone();
env.extend(settings.backend.dev.command_environment.clone());
Ok((env, settings.backend.dev.config_path.clone()))
})
.await?;
cmd_env.insert("RIVET_BACKEND_PORT".into(), backend_port.to_string());
cmd_env.insert("RIVET_BACKEND_HOSTNAME".into(), "0.0.0.0".to_string());
cmd_env.insert("RIVET_BACKEND_TERM_COLOR".into(), "never".into());
cmd_env.insert("RIVET_EDITOR_PORT".into(), editor_port.to_string());

// Build command
let cmd = build_backend_command_raw(backend::BackendCommandOpts {
command: "dev",
opts: serde_json::json!({
"project": config_path,
"nonInteractive": true
}),
env: cmd_env,
// Build env
let (mut cmd_env, config_path) =
config::settings::try_read(&paths::data_dir()?, |settings| {
let mut env = settings.backend.command_environment.clone();
env.extend(settings.backend.dev.command_environment.clone());
Ok((env, settings.backend.dev.config_path.clone()))
})
.await?;
cmd_env.insert("RIVET_BACKEND_PORT".into(), backend_port.to_string());
cmd_env.insert("RIVET_BACKEND_HOSTNAME".into(), "0.0.0.0".to_string());
cmd_env.insert("RIVET_BACKEND_TERM_COLOR".into(), "never".into());
cmd_env.insert("RIVET_EDITOR_PORT".into(), editor_port.to_string());

// Publish commandevent
task_inner.event(task::TaskEvent::PortUpdate {
backend_port,
editor_port,
});

Ok(CommandOpts {
command: cmd.command.display().to_string(),
args: cmd.args,
envs: cmd.envs.into_iter().collect(),
current_dir: cmd.current_dir.display().to_string(),
})
},
)
.await?;
// Build command
let cmd = build_backend_command_raw(backend::BackendCommandOpts {
command: "dev",
opts: serde_json::json!({
"project": config_path,
"nonInteractive": true
}),
env: cmd_env,
})
.await?;

// Publish commandevent
task_inner.event(task::TaskEvent::PortUpdate {
backend_port,
editor_port,
});

Ok(CommandOpts {
command: cmd.command.display().to_string(),
args: cmd.args,
envs: cmd.envs.into_iter().collect(),
current_dir: cmd.current_dir.display().to_string(),
})
},
);

// Poll for config file updates
let poll_config_fut = poll_config_file(task.clone());

// Wait futures
let exit_code = tokio::select! {
res = pm_fut => {
res?
}
res = poll_config_fut => {
res?;
bail!("poll_config_file exited unexpectedly");
}
};

Ok(Output { exit_code })
}
}

async fn poll_config_file(task_ctx: task::TaskCtx) -> Result<()> {
// Read meta path from backend
let meta_output = build_backend_command(backend::BackendCommandOpts {
command: "metaPath",
opts: json!({}),
env: Default::default(),
})
.await?
.output()
.await?;

// Parse and validate meta path
let meta_path = String::from_utf8(meta_output.stdout)?;
let meta_path = meta_path.trim();
ensure!(
meta_path.contains("\n"),
"Expected exactly one line of output, got:\n{meta_path:?}"
);
let meta_path = Path::new(meta_path);

// Poll the file for updates
//
// We do this instead of using a file watcher since file watchers are frequently broken across
// platform and will require extensive testing.
let mut last_modified = None;
loop {
tokio::time::sleep(Duration::from_secs(2)).await;

match tokio::fs::metadata(&meta_path).await {
Result::Ok(metadata) => {
if let Result::Ok(file_modified) = metadata.modified() {
if last_modified.map_or(true, |x| file_modified > x) {
// File has changed
last_modified = Some(file_modified);

// Read meta
match read_meta_and_build_event(meta_path).await {
Result::Ok(event) => {
task_ctx.event(task::TaskEvent::BackendConfigUpdate(event));
}
Err(err) => task_ctx.log(format!("Failed to read backend meta: {err}")),
}
}
}
}
Err(_) => {
// Config file does not exist yet. The backend likely has not written it.
}
}
}
}

/// Partial serde struct representing data we need to read from `meta.json`.
///
/// See packages/backend/toolchain/build/meta.ts
mod backend_meta {
use serde::Deserialize;
use std::collections::HashMap;

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Meta {
pub modules: HashMap<String, Module>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Module {
pub config: ModuleConfig,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModuleConfig {
pub name: String,
}
}

/// Reads the meta.json from the filesystem and converts it to an event.
async fn read_meta_and_build_event(
config_path: impl AsRef<Path>,
) -> Result<backend_config_update::Event> {
// Read editor port
let editor_port = meta::read_project(&paths::data_dir()?, |x| x.editor_port)
.await?
.context("project_meta.editor_port")?;

// Read meta
let meta = tokio::task::block_in_place(|| {
let file = std::fs::File::open(config_path)?;
let meta = serde_json::from_reader::<_, backend_meta::Meta>(&file)?;
Ok(meta)
})?;

let mut modules = meta
.modules
.into_iter()
.map(|(slug, module)| backend_config_update::Module {
slug: slug.clone(),
name: module.config.name,
config_url: format!("http://127.0.0.1:{editor_port}/#{slug}"),
docs_url: format!("https://rivet.gg/modules/{slug}"),
})
.collect::<Vec<_>>();
modules.sort_by_cached_key(|x| x.name.clone());

// Convert to event
Ok(backend_config_update::Event { modules })
}
13 changes: 1 addition & 12 deletions packages/toolchain/src/util/task/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
use anyhow::*;
use serde::Serialize;
use std::{process::Stdio, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::{broadcast, mpsc},
};

#[derive(Serialize)]
pub enum TaskEvent {
#[serde(rename = "log")]
Log(String),
#[serde(rename = "result")]
Result {
result: Box<serde_json::value::RawValue>,
},
#[serde(rename = "port_update")]
PortUpdate { backend_port: u16, editor_port: u16 },
}
use super::TaskEvent;

// HACK: Tokio bug drops the channel using the native `UnboundedSender::clone`, so we have to use
// `Arc`.
Expand Down
31 changes: 31 additions & 0 deletions packages/toolchain/src/util/task/event.rs
Original file line number Diff line number Diff line change
@@ -1 +1,32 @@
use serde::Serialize;

#[derive(Serialize)]
pub enum TaskEvent {
#[serde(rename = "log")]
Log(String),
#[serde(rename = "result")]
Result {
result: Box<serde_json::value::RawValue>,
},
#[serde(rename = "port_update")]
PortUpdate { backend_port: u16, editor_port: u16 },
#[serde(rename = "backend_config_update")]
BackendConfigUpdate(backend_config_update::Event),
}

pub mod backend_config_update {
use serde::Serialize;

#[derive(Serialize)]
pub struct Event {
pub modules: Vec<Module>,
}

#[derive(Serialize)]
pub struct Module {
pub slug: String,
pub name: String,
pub config_url: String,
pub docs_url: String,
}
}
2 changes: 2 additions & 0 deletions packages/toolchain/src/util/task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod ctx;
pub mod event;
mod register;
mod run;
mod task;

pub use ctx::*;
pub use event::*;
pub use register::*;
pub use run::*;
pub use task::*;

0 comments on commit 1dea393

Please sign in to comment.