Skip to content
This repository has been archived by the owner on Dec 21, 2024. It is now read-only.

Commit

Permalink
feat(toolchain): dispatch backend config update events (#397)
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Sep 17, 2024
1 parent 99a7cae commit 6885c83
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 72 deletions.
12 changes: 12 additions & 0 deletions packages/backend/cli/commands/meta/path.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { z } from "zod";
import { globalOptsSchema, initProject } from "../../common.ts";
import { metaPath } from "../../../toolchain/project/mod.ts";

export const optsSchema = z.object({}).merge(globalOptsSchema);

type Opts = z.infer<typeof optsSchema>;

export async function execute(opts: Opts) {
const project = await initProject(opts);
console.log(metaPath(project));
}
17 changes: 11 additions & 6 deletions packages/backend/cli/execute.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import { z } from "zod";
import { UnreachableError } from "../toolchain/error/mod.ts";

import * as build from "./commands/build.ts";
import * as clean from "./commands/clean.ts";
import * as configShow from "./commands/config/show.ts";
import * as createActor from "./commands/create/actor.ts";
import * as createModule from "./commands/create/module.ts";
import * as createScript from "./commands/create/script.ts";
import * as createTest from "./commands/create/test.ts";
import * as dev from "./commands/dev.ts";
import * as format from "./commands/format.ts";
import * as init from "./commands/init.ts";
import * as lint from "./commands/lint.ts";
import * as test from "./commands/test.ts";
import * as dbInstanceStart from "./commands/db/instance/start.ts";
import * as dbInstanceStatus from "./commands/db/instance/status.ts";
import * as dbInstanceStop from "./commands/db/instance/stop.ts";
Expand All @@ -21,7 +18,12 @@ import * as dbMigratePush from "./commands/db/migrate/push.ts";
import * as dbReset from "./commands/db/reset.ts";
import * as dbSh from "./commands/db/sh.ts";
import * as dbUrl from "./commands/db/url.ts";
import { UnreachableError } from "../toolchain/error/mod.ts";
import * as dev from "./commands/dev.ts";
import * as format from "./commands/format.ts";
import * as init from "./commands/init.ts";
import * as lint from "./commands/lint.ts";
import * as metaPath from "./commands/meta/path.ts";
import * as test from "./commands/test.ts";

export const commandSchema = z.union([
z.object({ build: build.optsSchema }),
Expand All @@ -35,6 +37,7 @@ export const commandSchema = z.union([
z.object({ format: format.optsSchema }),
z.object({ init: init.optsSchema }),
z.object({ lint: lint.optsSchema }),
z.object({ metaPath: metaPath.optsSchema }),
z.object({ test: test.optsSchema }),
z.object({ dbInstanceStart: dbInstanceStart.optsSchema }),
z.object({ dbInstanceStatus: dbInstanceStatus.optsSchema }),
Expand Down Expand Up @@ -71,6 +74,8 @@ export async function executeCommand(command: Command) {
await format.execute(command.format);
} else if ("init" in command) {
await init.execute(command.init);
} else if ("metaPath" in command) {
await metaPath.execute(command.metaPath);
} else if ("lint" in command) {
await lint.execute(command.lint);
} else if ("test" in command) {
Expand Down
269 changes: 215 additions & 54 deletions packages/toolchain/src/tasks/backend_start.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use anyhow::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{path::Path, time::Duration};

use crate::{
backend::{self, build_backend_command_raw},
backend::{self, build_backend_command, build_backend_command_raw},
config::{self, meta},
paths,
util::{
process_manager::{CommandOpts, StartMode, StartOpts},
task,
task::{self, backend_config_update},
},
};

Expand Down Expand Up @@ -45,63 +47,222 @@ impl task::Task for Task {

// Start or hook to backend
let task_inner = task.clone();
let exit_code = backend::PROCESS_MANAGER_DEV
.start(
StartOpts {
task,
start_mode: input.start_mode,
base_data_dir: paths::data_dir()?,
},
|| async move {
// Pick dev port
let backend_port = portpicker::pick_unused_port().context("no free ports")?;
let editor_port = portpicker::pick_unused_port().context("no free ports")?;
meta::mutate_project(&paths::data_dir()?, |x| {
x.backend_port = Some(backend_port);
x.editor_port = Some(editor_port);
})
.await?;
let pm_fut = backend::PROCESS_MANAGER_DEV.start(
StartOpts {
task: task.clone(),
start_mode: input.start_mode,
base_data_dir: paths::data_dir()?,
},
|| async move {
// Pick dev port
let backend_port = portpicker::pick_unused_port().context("no free ports")?;
let editor_port = portpicker::pick_unused_port().context("no free ports")?;
meta::mutate_project(&paths::data_dir()?, |x| {
x.backend_port = Some(backend_port);
x.editor_port = Some(editor_port);
})
.await?;

// Build env
let (mut cmd_env, config_path) =
config::settings::try_read(&paths::data_dir()?, |settings| {
let mut env = settings.backend.command_environment.clone();
env.extend(settings.backend.dev.command_environment.clone());
Ok((env, settings.backend.dev.config_path.clone()))
})
.await?;
cmd_env.insert("RIVET_BACKEND_PORT".into(), backend_port.to_string());
cmd_env.insert("RIVET_BACKEND_HOSTNAME".into(), "0.0.0.0".to_string());
cmd_env.insert("RIVET_BACKEND_TERM_COLOR".into(), "never".into());
cmd_env.insert("RIVET_EDITOR_PORT".into(), editor_port.to_string());

// Build command
let cmd = build_backend_command_raw(backend::BackendCommandOpts {
command: "dev",
opts: serde_json::json!({
"project": config_path,
"nonInteractive": true
}),
env: cmd_env,
// Build env
let (mut cmd_env, config_path) =
config::settings::try_read(&paths::data_dir()?, |settings| {
let mut env = settings.backend.command_environment.clone();
env.extend(settings.backend.dev.command_environment.clone());
Ok((env, settings.backend.dev.config_path.clone()))
})
.await?;
cmd_env.insert("RIVET_BACKEND_PORT".into(), backend_port.to_string());
cmd_env.insert("RIVET_BACKEND_HOSTNAME".into(), "0.0.0.0".to_string());
cmd_env.insert("RIVET_BACKEND_TERM_COLOR".into(), "never".into());
cmd_env.insert("RIVET_EDITOR_PORT".into(), editor_port.to_string());

// Publish commandevent
task_inner.event(task::TaskEvent::PortUpdate {
backend_port,
editor_port,
});

Ok(CommandOpts {
command: cmd.command.display().to_string(),
args: cmd.args,
envs: cmd.envs.into_iter().collect(),
current_dir: cmd.current_dir.display().to_string(),
})
},
)
.await?;
// Build command
let cmd = build_backend_command_raw(backend::BackendCommandOpts {
command: "dev",
opts: serde_json::json!({
"project": config_path,
"nonInteractive": true
}),
env: cmd_env,
})
.await?;

// Publish commandevent
task_inner.event(task::TaskEvent::PortUpdate {
backend_port,
editor_port,
});

Ok(CommandOpts {
command: cmd.command.display().to_string(),
args: cmd.args,
envs: cmd.envs.into_iter().collect(),
current_dir: cmd.current_dir.display().to_string(),
})
},
);

// Poll for config file updates
let poll_config_fut = poll_config_file(task.clone());

// Wait futures
let exit_code = tokio::select! {
res = pm_fut => {
res?
}
res = poll_config_fut => {
res?;
bail!("poll_config_file exited unexpectedly");
}
};

Ok(Output { exit_code })
}
}

async fn poll_config_file(task_ctx: task::TaskCtx) -> Result<()> {
// Read meta path from backend
let mut interval = tokio::time::interval(Duration::from_secs(2));
let meta_output = loop {
interval.tick().await;

let output = build_backend_command(backend::BackendCommandOpts {
command: "configManifestPath",
opts: json!({
"project": null
}),
env: Default::default(),
})
.await?
.output()
.await?;

if output.status.success() {
break output;
}
};

// Parse and validate meta path
let meta_path = String::from_utf8(meta_output.stdout)?;
let meta_path = meta_path.trim();
ensure!(
!meta_path.contains("\n"),
"Expected exactly one line of output, got:\n{meta_path:?}"
);
let meta_path = Path::new(meta_path);

// Poll the file for updates
//
// We do this instead of using a file watcher since file watchers are frequently broken across
// platform and will require extensive testing.
let mut interval = tokio::time::interval(Duration::from_secs(2));
let mut last_file_modified = None;
let mut last_editor_port = None;
loop {
interval.tick().await;

// Check for file change
let editor_port = if let Some(editor_port) =
meta::read_project(&paths::data_dir()?, |x| x.editor_port).await?
{
editor_port
} else {
// The editor port has not been chosen yet.
continue;
};

// Check for file change
let file_modified = match tokio::fs::metadata(&meta_path).await {
Result::Ok(metadata) => match metadata.modified() {
Result::Ok(x) => x,
Err(err) => {
task_ctx.log(format!("Failed to read file modification time: {err}"));
continue;
}
},
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
// Config file does not exist yet. The backend likely has not written it.
continue;
}
Err(err) => {
task_ctx.log(format!("Failed to read file metadata: {err}"));
continue;
}
};

// Publish event if changed
let updated_file_modified = last_file_modified.map_or(true, |x| file_modified > x);
let updated_editor_port = last_editor_port.map_or(true, |x| x != editor_port);
if updated_file_modified || updated_editor_port {
last_file_modified = Some(file_modified);
last_editor_port = Some(editor_port);

match read_meta_and_build_event(meta_path, editor_port).await {
Result::Ok(event) => {
task_ctx.event(task::TaskEvent::BackendConfigUpdate(event));
}
Err(err) => task_ctx.log(format!("Failed to read backend meta: {err}")),
}
}
}
}

/// Partial serde struct representing data we need to read from `meta.json`.
///
/// See packages/backend/toolchain/build/meta.ts
mod backend_meta {
use serde::Deserialize;
use std::collections::HashMap;

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Meta {
pub modules: HashMap<String, Module>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Module {
pub config: ModuleConfig,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModuleConfig {
pub name: String,
}
}

/// Reads the meta.json from the filesystem and converts it to an event.
///
/// Uses this intermediate step to convert the data in the toolchain instead of passing the direct
/// manifest to the plugin in order to:
/// - Ensure a consistent format
/// - Reduce overhead of updates (the config is massive)
/// - Enhance with any toolchain-specific data (e.g. edtor config url)
async fn read_meta_and_build_event(
config_path: impl AsRef<Path>,
editor_port: u16,
) -> Result<backend_config_update::Event> {
// Read meta
let meta = tokio::task::block_in_place(|| {
let file = std::fs::File::open(config_path)?;
let meta = serde_json::from_reader::<_, backend_meta::Meta>(&file)?;
Ok(meta)
})?;

// Convert to event
let mut modules = meta
.modules
.into_iter()
.map(|(slug, module)| backend_config_update::Module {
slug: slug.clone(),
name: module.config.name,
config_url: format!("http://127.0.0.1:{editor_port}/#{slug}"),
docs_url: format!("https://rivet.gg/modules/{slug}"),
})
.collect::<Vec<_>>();
modules.sort_by_cached_key(|x| x.name.clone());

Ok(backend_config_update::Event { modules })
}
13 changes: 1 addition & 12 deletions packages/toolchain/src/util/task/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
use anyhow::*;
use serde::Serialize;
use std::{process::Stdio, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::{broadcast, mpsc},
};

#[derive(Serialize)]
pub enum TaskEvent {
#[serde(rename = "log")]
Log(String),
#[serde(rename = "result")]
Result {
result: Box<serde_json::value::RawValue>,
},
#[serde(rename = "port_update")]
PortUpdate { backend_port: u16, editor_port: u16 },
}
use super::TaskEvent;

// HACK: Tokio bug drops the channel using the native `UnboundedSender::clone`, so we have to use
// `Arc`.
Expand Down
Loading

0 comments on commit 6885c83

Please sign in to comment.