From b7a00d35ee08d4c4f6af5210c4006a6c96961ad0 Mon Sep 17 00:00:00 2001 From: Nicholas Yang Date: Tue, 10 Sep 2024 17:30:08 -0400 Subject: [PATCH] feat(turborepo): web ui (#8895) ### Description Implements a very very alpha version of a web UI. Uses a GraphQL query to get the current run and tasks. ### Testing Instructions Would love some ideas here on tests. To try out manually, go to #35066 in the other repo and run `turbo-studio`, then execute a turbo command with the `ui` flag set to `web` --- Cargo.lock | 25 ++ Cargo.toml | 2 + crates/turborepo-lib/Cargo.toml | 4 +- crates/turborepo-lib/src/commands/run.rs | 10 +- crates/turborepo-lib/src/opts.rs | 5 + crates/turborepo-lib/src/run/builder.rs | 12 +- crates/turborepo-lib/src/run/error.rs | 2 + crates/turborepo-lib/src/run/mod.rs | 52 +++- crates/turborepo-lib/src/run/watch.rs | 14 +- .../turborepo-lib/src/task_graph/visitor.rs | 51 ++-- crates/turborepo-lib/src/turbo_json/mod.rs | 8 + crates/turborepo-ui/Cargo.toml | 10 + crates/turborepo-ui/src/lib.rs | 6 + crates/turborepo-ui/src/sender.rs | 156 ++++++++++ crates/turborepo-ui/src/tui/event.rs | 9 +- crates/turborepo-ui/src/tui/handle.rs | 167 ++++------- crates/turborepo-ui/src/tui/mod.rs | 4 +- crates/turborepo-ui/src/wui/event.rs | 34 +++ crates/turborepo-ui/src/wui/mod.rs | 24 ++ crates/turborepo-ui/src/wui/sender.rs | 78 +++++ crates/turborepo-ui/src/wui/server.rs | 110 +++++++ crates/turborepo-ui/src/wui/subscriber.rs | 276 ++++++++++++++++++ turborepo-tests/integration/tests/no-args.t | 2 +- .../integration/tests/turbo-help.t | 11 +- web-ui/next-env.d.ts | 5 - 25 files changed, 889 insertions(+), 188 deletions(-) create mode 100644 crates/turborepo-ui/src/sender.rs create mode 100644 crates/turborepo-ui/src/wui/event.rs create mode 100644 crates/turborepo-ui/src/wui/mod.rs create mode 100644 crates/turborepo-ui/src/wui/sender.rs create mode 100644 crates/turborepo-ui/src/wui/server.rs create mode 100644 crates/turborepo-ui/src/wui/subscriber.rs delete mode 100644 web-ui/next-env.d.ts diff --git a/Cargo.lock b/Cargo.lock index b7778fa8c0eeb..4949c6e3246c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5413,6 +5413,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.5.0", + "bytes", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.2" @@ -6137,7 +6153,12 @@ name = "turborepo-ui" version = "0.1.0" dependencies = [ "anyhow", + "async-graphql", + "async-graphql-axum", + "async-stream", "atty", + "axum 0.7.5", + "axum-server 0.7.1", "base64 0.22.1", "chrono", "clipboard-win", @@ -6149,9 +6170,13 @@ dependencies = [ "lazy_static", "nix 0.26.2", "ratatui", + "serde", + "serde_json", "tempfile", "test-case", "thiserror", + "tokio", + "tower-http", "tracing", "tui-term", "turbopath", diff --git a/Cargo.toml b/Cargo.toml index 400e03d2bbfc2..d655030df1620 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,8 @@ async-compression = { version = "0.3.13", default-features = false, features = [ "gzip", "tokio", ] } +async-graphql = "7.0.7" +async-graphql-axum = "7.0.7" async-trait = "0.1.64" atty = "0.2.14" axum = "0.7.5" diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 43f802ab32780..92a2976c76d54 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -32,8 +32,8 @@ turborepo-vercel-api-mock = { workspace = true } workspace = true [dependencies] -async-graphql = "7.0.7" -async-graphql-axum = "7.0.7" +async-graphql = { workspace = true } +async-graphql-axum = { workspace = true } atty = { workspace = true } axum = { workspace = true } biome_deserialize = { workspace = true } diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index fe9c251eedbe7..46c67aac34ece 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -2,6 +2,7 @@ use std::future::Future; use tracing::error; use turborepo_telemetry::events::command::CommandEventBuilder; +use turborepo_ui::sender::UISender; use crate::{commands::CommandBase, run, run::builder::RunBuilder, signal::SignalHandler}; @@ -44,15 +45,20 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result>, pub(crate) experimental_space_id: Option, pub is_github_actions: bool, + pub ui_mode: UIMode, } impl RunOpts { @@ -267,6 +269,7 @@ impl<'a> TryFrom> for RunOpts { env_mode: inputs.config.env_mode(), cache_dir: inputs.config.cache_dir().into(), is_github_actions, + ui_mode: inputs.config.ui(), }) } } @@ -394,6 +397,7 @@ mod test { use crate::{ cli::DryRunMode, opts::{Opts, RunCacheOpts, ScopeOpts}, + turbo_json::UIMode, }; #[derive(Default)] @@ -499,6 +503,7 @@ mod test { only: opts_input.only, dry_run: opts_input.dry_run, graph: None, + ui_mode: UIMode::Stream, single_package: false, log_prefix: crate::opts::ResolvedLogPrefix::Task, log_order: crate::opts::ResolvedLogOrder::Stream, diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index ede9e9577a7bc..04641f93df66c 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -57,7 +57,6 @@ pub struct RunBuilder { root_turbo_json_path: AbsoluteSystemPathBuf, color_config: ColorConfig, version: &'static str, - ui_mode: UIMode, api_client: APIClient, analytics_sender: Option, // In watch mode, we can have a changed package that we want to serve as an entrypoint. @@ -78,13 +77,12 @@ impl RunBuilder { let allow_missing_package_manager = config.allow_no_package_manager(); let version = base.version(); - let ui_mode = config.ui(); let processes = ProcessManager::new( // We currently only use a pty if the following are met: // - we're attached to a tty atty::is(atty::Stream::Stdout) && // - if we're on windows, we're using the UI - (!cfg!(windows) || matches!(ui_mode, UIMode::Tui)), + (!cfg!(windows) || matches!(opts.run_opts.ui_mode, UIMode::Tui)), ); let root_turbo_json_path = config.root_turbo_json_path(&base.repo_root); @@ -101,7 +99,6 @@ impl RunBuilder { repo_root, color_config: ui, version, - ui_mode, api_auth, analytics_sender: None, entrypoint_packages: None, @@ -413,7 +410,6 @@ impl RunBuilder { Ok(Run { version: self.version, color_config: self.color_config, - ui_mode: self.ui_mode, start_at, processes: self.processes, run_telemetry, @@ -468,7 +464,11 @@ impl RunBuilder { if !self.opts.run_opts.parallel { engine - .validate(pkg_dep_graph, self.opts.run_opts.concurrency, self.ui_mode) + .validate( + pkg_dep_graph, + self.opts.run_opts.concurrency, + self.opts.run_opts.ui_mode, + ) .map_err(Error::EngineValidation)?; } diff --git a/crates/turborepo-lib/src/run/error.rs b/crates/turborepo-lib/src/run/error.rs index 706a3ba5a6e79..b0806dd18a47b 100644 --- a/crates/turborepo-lib/src/run/error.rs +++ b/crates/turborepo-lib/src/run/error.rs @@ -57,5 +57,7 @@ pub enum Error { #[error(transparent)] Daemon(#[from] daemon::DaemonError), #[error(transparent)] + UI(#[from] turborepo_ui::Error), + #[error(transparent)] Tui(#[from] tui::Error), } diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 8c4949f22cdf9..db28bb373f95f 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -31,7 +31,10 @@ use turborepo_env::EnvironmentVariableMap; use turborepo_repository::package_graph::{PackageGraph, PackageName, PackageNode}; use turborepo_scm::SCM; use turborepo_telemetry::events::generic::GenericEventBuilder; -use turborepo_ui::{cprint, cprintln, tui, tui::AppSender, ColorConfig, BOLD_GREY, GREY}; +use turborepo_ui::{ + cprint, cprintln, sender::UISender, tui, tui::TuiSender, wui::sender::WebUISender, ColorConfig, + BOLD_GREY, GREY, +}; pub use crate::run::error::Error; use crate::{ @@ -69,9 +72,13 @@ pub struct Run { task_access: TaskAccess, daemon: Option>, should_print_prelude: bool, - ui_mode: UIMode, } +type UIResult = Result>)>, Error>; + +type WuiResult = UIResult; +type TuiResult = UIResult; + impl Run { fn has_persistent_tasks(&self) -> bool { self.engine.has_persistent_tasks @@ -195,24 +202,41 @@ impl Run { } pub fn has_tui(&self) -> bool { - self.ui_mode.use_tui() + self.opts.run_opts.ui_mode.use_tui() } pub fn should_start_ui(&self) -> Result { - Ok(self.ui_mode.use_tui() + Ok(self.opts.run_opts.ui_mode.use_tui() && self.opts.run_opts.dry_run.is_none() && tui::terminal_big_enough()?) } - #[allow(clippy::type_complexity)] - pub fn start_experimental_ui( - &self, - ) -> Result>)>, Error> { + pub fn start_ui(&self) -> UIResult { // Print prelude here as this needs to happen before the UI is started if self.should_print_prelude { self.print_run_prelude(); } + match self.opts.run_opts.ui_mode { + UIMode::Tui => self + .start_terminal_ui() + .map(|res| res.map(|(sender, handle)| (UISender::Tui(sender), handle))), + UIMode::Stream => Ok(None), + UIMode::Web => self + .start_web_ui() + .map(|res| res.map(|(sender, handle)| (UISender::Wui(sender), handle))), + } + } + fn start_web_ui(&self) -> WuiResult { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let handle = tokio::spawn(turborepo_ui::wui::server::start_server(rx)); + + Ok(Some((WebUISender { tx }, handle))) + } + + #[allow(clippy::type_complexity)] + fn start_terminal_ui(&self) -> TuiResult { if !self.should_start_ui()? { return Ok(None); } @@ -223,8 +247,8 @@ impl Run { return Ok(None); } - let (sender, receiver) = AppSender::new(); - let handle = tokio::task::spawn_blocking(move || tui::run_app(task_names, receiver)); + let (sender, receiver) = TuiSender::new(); + let handle = tokio::task::spawn_blocking(move || Ok(tui::run_app(task_names, receiver)?)); Ok(Some((sender, handle))) } @@ -236,11 +260,7 @@ impl Run { } } - pub async fn run( - &mut self, - experimental_ui_sender: Option, - is_watch: bool, - ) -> Result { + pub async fn run(&mut self, ui_sender: Option, is_watch: bool) -> Result { let skip_cache_writes = self.opts.runcache_opts.skip_writes; if let Some(subscriber) = self.signal_handler.subscribe() { let run_cache = self.run_cache.clone(); @@ -427,7 +447,7 @@ impl Run { self.processes.clone(), &self.repo_root, global_env, - experimental_ui_sender, + ui_sender, is_watch, ); diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 995ff810400c4..7ba86ff5074a7 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -11,7 +11,7 @@ use tokio::{ use tracing::{instrument, trace}; use turborepo_repository::package_graph::PackageName; use turborepo_telemetry::events::command::CommandEventBuilder; -use turborepo_ui::{tui, tui::AppSender}; +use turborepo_ui::sender::UISender; use crate::{ cli::{Command, RunArgs}, @@ -53,8 +53,8 @@ pub struct WatchClient { base: CommandBase, telemetry: CommandEventBuilder, handler: SignalHandler, - ui_sender: Option, - ui_handle: Option>>, + ui_sender: Option, + ui_handle: Option>>, } struct PersistentRunHandle { @@ -99,6 +99,8 @@ pub enum Error { SignalInterrupt, #[error("package change error")] PackageChange(#[from] tonic::Status), + #[error(transparent)] + UI(#[from] turborepo_ui::Error), #[error("could not connect to UI thread")] UISend(String), #[error("cannot use root turbo.json at {0} with watch mode")] @@ -134,7 +136,7 @@ impl WatchClient { let watched_packages = run.get_relevant_packages(); - let (sender, handle) = run.start_experimental_ui()?.unzip(); + let (ui_sender, ui_handle) = run.start_ui()?.unzip(); let connector = DaemonConnector { can_start_server: true, @@ -150,8 +152,8 @@ impl WatchClient { handler, telemetry, persistent_tasks_handle: None, - ui_sender: sender, - ui_handle: handle, + ui_sender, + ui_handle, }) } diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index 354c3dc6a1007..7c683161d01d6 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -25,13 +25,15 @@ use turborepo_telemetry::events::{ generic::GenericEventBuilder, task::PackageTaskEventBuilder, EventBuilder, TrackedErrors, }; use turborepo_ui::{ - tui::{self, event::CacheResult, AppSender, TuiTask}, + sender::{TaskSender, UISender}, + tui::event::CacheResult, ColorConfig, ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, }; use which::which; use crate::{ cli::EnvMode, + config::UIMode, engine::{Engine, ExecutionOptions, StopExecution}, opts::RunOpts, process::{ChildExit, Command, ProcessManager}, @@ -64,8 +66,8 @@ pub struct Visitor<'a> { sink: OutputSink, task_hasher: TaskHasher<'a>, color_config: ColorConfig, - experimental_ui_sender: Option, is_watch: bool, + ui_sender: Option, } #[derive(Debug, thiserror::Error, Diagnostic)] @@ -117,7 +119,7 @@ impl<'a> Visitor<'a> { manager: ProcessManager, repo_root: &'a AbsoluteSystemPath, global_env: EnvironmentVariableMap, - experimental_ui_sender: Option, + ui_sender: Option, is_watch: bool, ) -> Self { let task_hasher = TaskHasher::new( @@ -130,10 +132,7 @@ impl<'a> Visitor<'a> { let sink = Self::sink(run_opts); let color_cache = ColorSelector::default(); // Set up correct size for underlying pty - if let Some(pane_size) = experimental_ui_sender - .as_ref() - .and_then(|sender| sender.pane_size()) - { + if let Some(pane_size) = ui_sender.as_ref().and_then(|sender| sender.pane_size()) { manager.set_pty_size(pane_size.rows, pane_size.cols); } @@ -152,7 +151,7 @@ impl<'a> Visitor<'a> { task_hasher, color_config, global_env, - experimental_ui_sender, + ui_sender, is_watch, } } @@ -289,7 +288,7 @@ impl<'a> Visitor<'a> { let vendor_behavior = Vendor::infer().and_then(|vendor| vendor.behavior.as_ref()); - let output_client = if let Some(handle) = &self.experimental_ui_sender { + let output_client = if let Some(handle) = &self.ui_sender { TaskOutput::UI(handle.task(info.to_string())) } else { TaskOutput::Direct(self.output_client(&info, vendor_behavior)) @@ -327,7 +326,7 @@ impl<'a> Visitor<'a> { drop(factory); if !self.is_watch { - if let Some(handle) = &self.experimental_ui_sender { + if let Some(handle) = &self.ui_sender { handle.stop(); } } @@ -502,8 +501,8 @@ impl<'a> Visitor<'a> { pub fn dry_run(&mut self) { self.dry = true; - // No need to start a TUI on dry run - self.experimental_ui_sender = None; + // No need to start a UI on dry run + self.ui_sender = None; } } @@ -557,7 +556,7 @@ impl std::io::Write for StdWriter { /// interacting with them. enum TaskOutput { Direct(OutputClient), - UI(tui::TuiTask), + UI(TaskSender), } fn turbo_regex() -> &'static Regex { @@ -644,7 +643,7 @@ struct ExecContextFactory<'a> { impl<'a> ExecContextFactory<'a> { pub fn new( - visitor: &'a Visitor, + visitor: &'a Visitor<'a>, errors: Arc>>, manager: ProcessManager, engine: &'a Arc, @@ -673,8 +672,8 @@ impl<'a> ExecContextFactory<'a> { let task_id_string = &task_id.to_string(); ExecContext { engine: self.engine.clone(), + ui_mode: self.visitor.run_opts.ui_mode, color_config: self.visitor.color_config, - experimental_ui: self.visitor.experimental_ui_sender.is_some(), is_github_actions: self.visitor.run_opts.is_github_actions, pretty_prefix: self .visitor @@ -713,7 +712,7 @@ impl<'a> ExecContextFactory<'a> { struct ExecContext { engine: Arc, color_config: ColorConfig, - experimental_ui: bool, + ui_mode: UIMode, is_github_actions: bool, pretty_prefix: StyledObject, task_id: TaskId<'static>, @@ -762,7 +761,7 @@ impl ExecContext { &mut self, parent_span_id: Option, tracker: TaskTracker<()>, - output_client: TaskOutput, + output_client: TaskOutput, callback: oneshot::Sender>, spaces_client: Option, telemetry: &PackageTaskEventBuilder, @@ -869,13 +868,13 @@ impl ExecContext { async fn execute_inner( &mut self, - output_client: &TaskOutput, + output_client: &TaskOutput, telemetry: &PackageTaskEventBuilder, ) -> Result { let task_start = Instant::now(); let mut prefixed_ui = self.prefixed_ui(output_client); - if self.experimental_ui { + if self.ui_mode.has_sender() { if let TaskOutput::UI(task) = output_client { let output_logs = self.task_cache.output_logs().into(); task.start(output_logs); @@ -926,7 +925,7 @@ impl ExecContext { cmd.env("TURBO_HASH", &self.task_hash); // Allow downstream tools to detect if the task is being ran with TUI - if self.experimental_ui { + if self.ui_mode.use_tui() { cmd.env("TURBO_IS_TUI", "true"); } @@ -963,7 +962,7 @@ impl ExecContext { } }; - if self.experimental_ui && self.takes_input { + if self.ui_mode.has_sender() && self.takes_input { if let TaskOutput::UI(task) = output_client { if let Some(stdin) = process.stdin() { task.set_stdin(stdin); @@ -1104,11 +1103,11 @@ impl DryRunExecContext { /// Struct for displaying information about task's cache enum TaskCacheOutput { Direct(PrefixedUI), - UI(TuiTask), + UI(TaskSender), } impl TaskCacheOutput { - fn task_writer(&mut self) -> Either, TuiTask> { + fn task_writer(&mut self) -> Either, TaskSender> { match self { TaskCacheOutput::Direct(prefixed) => Either::Left(prefixed.output_prefixed_writer()), TaskCacheOutput::UI(task) => Either::Right(task.clone()), @@ -1163,21 +1162,21 @@ impl TaskOutput { } } - pub fn stdout(&self) -> Either, TuiTask> { + pub fn stdout(&self) -> Either, TaskSender> { match self { TaskOutput::Direct(client) => Either::Left(client.stdout()), TaskOutput::UI(client) => Either::Right(client.clone()), } } - pub fn stderr(&self) -> Either, TuiTask> { + pub fn stderr(&self) -> Either, TaskSender> { match self { TaskOutput::Direct(client) => Either::Left(client.stderr()), TaskOutput::UI(client) => Either::Right(client.clone()), } } - pub fn task_logs(&self) -> Either, TuiTask> { + pub fn task_logs(&self) -> Either, TaskSender> { match self { TaskOutput::Direct(client) => Either::Left(client.stdout()), TaskOutput::UI(client) => Either::Right(client.clone()), diff --git a/crates/turborepo-lib/src/turbo_json/mod.rs b/crates/turborepo-lib/src/turbo_json/mod.rs index b76d043a3fd16..e84b0cf6073c1 100644 --- a/crates/turborepo-lib/src/turbo_json/mod.rs +++ b/crates/turborepo-lib/src/turbo_json/mod.rs @@ -177,6 +177,8 @@ pub enum UIMode { Tui, /// Use the standard output stream Stream, + /// Use the web user interface (experimental) + Web, } impl Default for UIMode { @@ -189,6 +191,12 @@ impl UIMode { pub fn use_tui(&self) -> bool { matches!(self, Self::Tui) } + + /// Returns true if the UI mode has a sender, + /// i.e. web or tui but not stream + pub fn has_sender(&self) -> bool { + matches!(self, Self::Tui | Self::Web) + } } #[derive(Serialize, Default, Debug, PartialEq, Clone, Iterable, Deserializable)] diff --git a/crates/turborepo-ui/Cargo.toml b/crates/turborepo-ui/Cargo.toml index bc20ec39440a3..21e5f002da9bb 100644 --- a/crates/turborepo-ui/Cargo.toml +++ b/crates/turborepo-ui/Cargo.toml @@ -15,7 +15,12 @@ test-case = { workspace = true } workspace = true [dependencies] +async-graphql = { workspace = true } +async-graphql-axum = { workspace = true } +async-stream = "0.3.5" atty = { workspace = true } +axum = { workspace = true, features = ["ws"] } +axum-server = { workspace = true } base64 = "0.22" chrono = { workspace = true } console = { workspace = true } @@ -25,7 +30,12 @@ indicatif = { workspace = true } lazy_static = { workspace = true } nix = { version = "0.26.2", features = ["signal"] } ratatui = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } + +tower-http = { version = "0.5.2", features = ["cors"] } tracing = { workspace = true } tui-term = { workspace = true } turbopath = { workspace = true } diff --git a/crates/turborepo-ui/src/lib.rs b/crates/turborepo-ui/src/lib.rs index 4b21d92c0418c..307ec9f50d981 100644 --- a/crates/turborepo-ui/src/lib.rs +++ b/crates/turborepo-ui/src/lib.rs @@ -9,7 +9,9 @@ mod line; mod logs; mod output; mod prefixed; +pub mod sender; pub mod tui; +pub mod wui; use std::{borrow::Cow, env, f64::consts::PI, time::Duration}; @@ -29,6 +31,10 @@ pub use crate::{ #[derive(Debug, Error)] pub enum Error { + #[error(transparent)] + Tui(#[from] tui::Error), + #[error(transparent)] + Wui(#[from] wui::Error), #[error("cannot read logs: {0}")] CannotReadLogs(#[source] std::io::Error), #[error("cannot write logs: {0}")] diff --git a/crates/turborepo-ui/src/sender.rs b/crates/turborepo-ui/src/sender.rs new file mode 100644 index 0000000000000..a7b3c11d39be1 --- /dev/null +++ b/crates/turborepo-ui/src/sender.rs @@ -0,0 +1,156 @@ +use std::sync::{Arc, Mutex}; + +use crate::{ + tui, + tui::event::{CacheResult, OutputLogs, PaneSize, TaskResult}, + wui::sender, +}; + +/// Enum to abstract over sending events to either the Tui or the Web UI +#[derive(Debug, Clone)] +pub enum UISender { + Tui(tui::TuiSender), + Wui(sender::WebUISender), +} + +impl UISender { + pub fn start_task(&self, task: String, output_logs: OutputLogs) { + match self { + UISender::Tui(sender) => sender.start_task(task, output_logs), + UISender::Wui(sender) => sender.start_task(task, output_logs), + } + } + + pub fn restart_tasks(&self, tasks: Vec) -> Result<(), crate::Error> { + match self { + UISender::Tui(sender) => sender.restart_tasks(tasks), + UISender::Wui(sender) => sender.restart_tasks(tasks), + } + } + + pub fn end_task(&self, task: String, result: TaskResult) { + match self { + UISender::Tui(sender) => sender.end_task(task, result), + UISender::Wui(sender) => sender.end_task(task, result), + } + } + + pub fn status(&self, task: String, status: String, result: CacheResult) { + match self { + UISender::Tui(sender) => sender.status(task, status, result), + UISender::Wui(sender) => sender.status(task, status, result), + } + } + fn set_stdin(&self, task: String, stdin: Box) { + match self { + UISender::Tui(sender) => sender.set_stdin(task, stdin), + UISender::Wui(sender) => sender.set_stdin(task, stdin), + } + } + + pub fn output(&self, task: String, output: Vec) -> Result<(), crate::Error> { + match self { + UISender::Tui(sender) => sender.output(task, output), + UISender::Wui(sender) => sender.output(task, output), + } + } + + /// Construct a sender configured for a specific task + pub fn task(&self, task: String) -> TaskSender { + match self { + UISender::Tui(sender) => sender.task(task), + UISender::Wui(sender) => sender.task(task), + } + } + pub fn stop(&self) { + match self { + UISender::Tui(sender) => sender.stop(), + UISender::Wui(sender) => sender.stop(), + } + } + pub fn update_tasks(&self, tasks: Vec) -> Result<(), crate::Error> { + match self { + UISender::Tui(sender) => sender.update_tasks(tasks), + UISender::Wui(sender) => sender.update_tasks(tasks), + } + } + + pub fn pane_size(&self) -> Option { + match self { + UISender::Tui(sender) => sender.pane_size(), + // Not applicable to the web UI + UISender::Wui(_) => None, + } + } +} + +#[derive(Debug, Clone)] +pub struct TaskSender { + pub(crate) name: String, + pub(crate) handle: UISender, + pub(crate) logs: Arc>>, +} + +impl TaskSender { + /// Access the underlying UISender + pub fn as_app(&self) -> &UISender { + &self.handle + } + + /// Mark the task as started + pub fn start(&self, output_logs: OutputLogs) { + self.handle.start_task(self.name.clone(), output_logs); + } + + /// Mark the task as finished + pub fn succeeded(&self, is_cache_hit: bool) -> Vec { + if is_cache_hit { + self.finish(TaskResult::CacheHit) + } else { + self.finish(TaskResult::Success) + } + } + + /// Mark the task as finished + pub fn failed(&self) -> Vec { + self.finish(TaskResult::Failure) + } + + fn finish(&self, result: TaskResult) -> Vec { + self.handle.end_task(self.name.clone(), result); + self.logs.lock().expect("logs lock poisoned").clone() + } + + pub fn set_stdin(&self, stdin: Box) { + self.handle.set_stdin(self.name.clone(), stdin); + } + + pub fn status(&self, status: &str, result: CacheResult) { + // Since this will be rendered via ratatui we any ANSI escape codes will not be + // handled. + // TODO: prevent the status from having ANSI codes in this scenario + let status = console::strip_ansi_codes(status).into_owned(); + self.handle.status(self.name.clone(), status, result); + } +} + +impl std::io::Write for TaskSender { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let task = self.name.clone(); + { + self.logs + .lock() + .expect("log lock poisoned") + .extend_from_slice(buf); + } + + self.handle + .output(task, buf.to_vec()) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "receiver dropped"))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} diff --git a/crates/turborepo-ui/src/tui/event.rs b/crates/turborepo-ui/src/tui/event.rs index 6f2f8583cc846..4e6365b9b1cc2 100644 --- a/crates/turborepo-ui/src/tui/event.rs +++ b/crates/turborepo-ui/src/tui/event.rs @@ -1,3 +1,6 @@ +use async_graphql::Enum; +use serde::Serialize; + pub enum Event { StartTask { task: String, @@ -62,20 +65,20 @@ pub enum Direction { Down, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Enum)] pub enum TaskResult { Success, Failure, CacheHit, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Enum)] pub enum CacheResult { Hit, Miss, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Enum)] pub enum OutputLogs { // Entire task output is persisted after run Full, diff --git a/crates/turborepo-ui/src/tui/handle.rs b/crates/turborepo-ui/src/tui/handle.rs index 74ab3e42b225f..2b8eccff3cc44 100644 --- a/crates/turborepo-ui/src/tui/handle.rs +++ b/crates/turborepo-ui/src/tui/handle.rs @@ -1,16 +1,14 @@ -use std::{ - sync::{mpsc, Arc, Mutex}, - time::Instant, -}; +use std::{sync::mpsc, time::Instant}; use super::{ event::{CacheResult, OutputLogs, PaneSize}, - Event, TaskResult, + Error, Event, TaskResult, }; +use crate::sender::{TaskSender, UISender}; /// Struct for sending app events to TUI rendering #[derive(Debug, Clone)] -pub struct AppSender { +pub struct TuiSender { primary: mpsc::Sender, } @@ -19,15 +17,7 @@ pub struct AppReceiver { primary: mpsc::Receiver, } -/// Struct for sending events related to a specific task -#[derive(Debug, Clone)] -pub struct TuiTask { - name: String, - handle: AppSender, - logs: Arc>>, -} - -impl AppSender { +impl TuiSender { /// Create a new channel for sending app events. /// /// AppSender is meant to be held by the actual task runner @@ -43,12 +33,38 @@ impl AppSender { }, ) } +} + +impl TuiSender { + pub fn start_task(&self, task: String, output_logs: OutputLogs) { + self.primary + .send(Event::StartTask { task, output_logs }) + .ok(); + } + + pub fn end_task(&self, task: String, result: TaskResult) { + self.primary.send(Event::EndTask { task, result }).ok(); + } + + pub fn status(&self, task: String, status: String, result: CacheResult) { + self.primary + .send(Event::Status { + task, + status, + result, + }) + .ok(); + } + + pub fn set_stdin(&self, task: String, stdin: Box) { + self.primary.send(Event::SetStdin { task, stdin }).ok(); + } /// Construct a sender configured for a specific task - pub fn task(&self, task: String) -> TuiTask { - TuiTask { + pub fn task(&self, task: String) -> TaskSender { + TaskSender { name: task, - handle: self.clone(), + handle: UISender::Tui(self.clone()), logs: Default::default(), } } @@ -64,13 +80,26 @@ impl AppSender { } /// Update the list of tasks displayed in the TUI - pub fn update_tasks(&self, tasks: Vec) -> Result<(), mpsc::SendError> { - self.primary.send(Event::UpdateTasks { tasks }) + pub fn update_tasks(&self, tasks: Vec) -> Result<(), crate::Error> { + Ok(self + .primary + .send(Event::UpdateTasks { tasks }) + .map_err(|err| Error::Mpsc(err.to_string()))?) + } + + pub fn output(&self, task: String, output: Vec) -> Result<(), crate::Error> { + Ok(self + .primary + .send(Event::TaskOutput { task, output }) + .map_err(|err| Error::Mpsc(err.to_string()))?) } /// Restart the list of tasks displayed in the TUI - pub fn restart_tasks(&self, tasks: Vec) -> Result<(), mpsc::SendError> { - self.primary.send(Event::RestartTasks { tasks }) + pub fn restart_tasks(&self, tasks: Vec) -> Result<(), crate::Error> { + Ok(self + .primary + .send(Event::RestartTasks { tasks }) + .map_err(|err| Error::Mpsc(err.to_string()))?) } /// Fetches the size of the terminal pane @@ -84,7 +113,7 @@ impl AppSender { } impl AppReceiver { - /// Receive an event, producing a tick event if no events are received by + /// Receive an event, producing a tick event if no events are rec eived by /// the deadline. pub fn recv(&self, deadline: Instant) -> Result { match self.primary.recv_deadline(deadline) { @@ -94,95 +123,3 @@ impl AppReceiver { } } } - -impl TuiTask { - /// Access the underlying AppSender - pub fn as_app(&self) -> &AppSender { - &self.handle - } - - /// Mark the task as started - pub fn start(&self, output_logs: OutputLogs) { - self.handle - .primary - .send(Event::StartTask { - task: self.name.clone(), - output_logs, - }) - .ok(); - } - - /// Mark the task as finished - pub fn succeeded(&self, is_cache_hit: bool) -> Vec { - if is_cache_hit { - self.finish(TaskResult::CacheHit) - } else { - self.finish(TaskResult::Success) - } - } - - /// Mark the task as finished - pub fn failed(&self) -> Vec { - self.finish(TaskResult::Failure) - } - - fn finish(&self, result: TaskResult) -> Vec { - self.handle - .primary - .send(Event::EndTask { - task: self.name.clone(), - result, - }) - .ok(); - self.logs.lock().expect("logs lock poisoned").clone() - } - - pub fn set_stdin(&self, stdin: Box) { - self.handle - .primary - .send(Event::SetStdin { - task: self.name.clone(), - stdin, - }) - .ok(); - } - - pub fn status(&self, status: &str, result: CacheResult) { - // Since this will be rendered via ratatui we any ANSI escape codes will not be - // handled. - // TODO: prevent the status from having ANSI codes in this scenario - let status = console::strip_ansi_codes(status).into_owned(); - self.handle - .primary - .send(Event::Status { - task: self.name.clone(), - status, - result, - }) - .ok(); - } -} - -impl std::io::Write for TuiTask { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let task = self.name.clone(); - { - self.logs - .lock() - .expect("log lock poisoned") - .extend_from_slice(buf); - } - self.handle - .primary - .send(Event::TaskOutput { - task, - output: buf.to_vec(), - }) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "receiver dropped"))?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} diff --git a/crates/turborepo-ui/src/tui/mod.rs b/crates/turborepo-ui/src/tui/mod.rs index 99735be667c01..6b0ce05538777 100644 --- a/crates/turborepo-ui/src/tui/mod.rs +++ b/crates/turborepo-ui/src/tui/mod.rs @@ -16,7 +16,7 @@ pub use app::{run_app, terminal_big_enough}; use clipboard::copy_to_clipboard; use debouncer::Debouncer; use event::{Event, TaskResult}; -pub use handle::{AppReceiver, AppSender, TuiTask}; +pub use handle::{AppReceiver, TuiSender}; use input::{input, InputOptions}; pub use pane::TerminalPane; use size::SizeInfo; @@ -25,6 +25,8 @@ pub use term_output::TerminalOutput; #[derive(Debug, thiserror::Error)] pub enum Error { + #[error("failed to send event to TUI: {0}")] + Mpsc(String), #[error("No task found with name '{name}'")] TaskNotFound { name: String }, #[error("No task at index {index} (only {len} tasks) ")] diff --git a/crates/turborepo-ui/src/wui/event.rs b/crates/turborepo-ui/src/wui/event.rs new file mode 100644 index 0000000000000..df9cb12eb2c53 --- /dev/null +++ b/crates/turborepo-ui/src/wui/event.rs @@ -0,0 +1,34 @@ +use serde::Serialize; + +use crate::tui::event::{CacheResult, OutputLogs, TaskResult}; + +/// Specific events that the GraphQL server can send to the client, +/// not all the `Event` types from the TUI. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", content = "payload")] +pub enum WebUIEvent { + StartTask { + task: String, + output_logs: OutputLogs, + }, + TaskOutput { + task: String, + output: Vec, + }, + EndTask { + task: String, + result: TaskResult, + }, + CacheStatus { + task: String, + message: String, + result: CacheResult, + }, + UpdateTasks { + tasks: Vec, + }, + RestartTasks { + tasks: Vec, + }, + Stop, +} diff --git a/crates/turborepo-ui/src/wui/mod.rs b/crates/turborepo-ui/src/wui/mod.rs new file mode 100644 index 0000000000000..3eb2384c846c2 --- /dev/null +++ b/crates/turborepo-ui/src/wui/mod.rs @@ -0,0 +1,24 @@ +//! Web UI for Turborepo. Creates a WebSocket server that can be subscribed to +//! by a web client to display the status of tasks. + +mod event; +pub mod sender; +pub mod server; +mod subscriber; + +use event::WebUIEvent; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("failed to start server")] + Server(#[from] std::io::Error), + #[error("failed to start websocket server: {0}")] + WebSocket(#[source] axum::Error), + #[error("failed to serialize message: {0}")] + Serde(#[from] serde_json::Error), + #[error("failed to send message")] + Send(#[from] axum::Error), + #[error("failed to send message through channel")] + Broadcast(#[from] tokio::sync::mpsc::error::SendError), +} diff --git a/crates/turborepo-ui/src/wui/sender.rs b/crates/turborepo-ui/src/wui/sender.rs new file mode 100644 index 0000000000000..80c0b0645caf6 --- /dev/null +++ b/crates/turborepo-ui/src/wui/sender.rs @@ -0,0 +1,78 @@ +use std::io::Write; + +use tracing::log::warn; + +use crate::{ + sender::{TaskSender, UISender}, + tui::event::{CacheResult, OutputLogs, TaskResult}, + wui::{event::WebUIEvent, Error}, +}; + +#[derive(Debug, Clone)] +pub struct WebUISender { + pub tx: tokio::sync::mpsc::UnboundedSender, +} + +impl WebUISender { + pub fn new(tx: tokio::sync::mpsc::UnboundedSender) -> Self { + Self { tx } + } + pub fn start_task(&self, task: String, output_logs: OutputLogs) { + self.tx + .send(WebUIEvent::StartTask { task, output_logs }) + .ok(); + } + + pub fn restart_tasks(&self, tasks: Vec) -> Result<(), crate::Error> { + self.tx + .send(WebUIEvent::RestartTasks { tasks }) + .map_err(Error::Broadcast)?; + Ok(()) + } + + pub fn end_task(&self, task: String, result: TaskResult) { + self.tx.send(WebUIEvent::EndTask { task, result }).ok(); + } + + pub fn status(&self, task: String, message: String, result: CacheResult) { + self.tx + .send(WebUIEvent::CacheStatus { + task, + message, + result, + }) + .ok(); + } + + pub fn set_stdin(&self, _: String, _: Box) { + warn!("stdin is not supported (yet) in web ui"); + } + + pub fn task(&self, task: String) -> TaskSender { + TaskSender { + name: task, + handle: UISender::Wui(self.clone()), + logs: Default::default(), + } + } + + pub fn stop(&self) { + self.tx.send(WebUIEvent::Stop).ok(); + } + + pub fn update_tasks(&self, tasks: Vec) -> Result<(), crate::Error> { + self.tx + .send(WebUIEvent::UpdateTasks { tasks }) + .map_err(Error::Broadcast)?; + + Ok(()) + } + + pub fn output(&self, task: String, output: Vec) -> Result<(), crate::Error> { + self.tx + .send(WebUIEvent::TaskOutput { task, output }) + .map_err(Error::Broadcast)?; + + Ok(()) + } +} diff --git a/crates/turborepo-ui/src/wui/server.rs b/crates/turborepo-ui/src/wui/server.rs new file mode 100644 index 0000000000000..81858b4b579cb --- /dev/null +++ b/crates/turborepo-ui/src/wui/server.rs @@ -0,0 +1,110 @@ +use std::sync::Arc; + +use async_graphql::{ + http::GraphiQLSource, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject, +}; +use async_graphql_axum::GraphQL; +use axum::{http::Method, response, response::IntoResponse, routing::get, Router}; +use serde::Serialize; +use tokio::{net::TcpListener, sync::Mutex}; +use tower_http::cors::{Any, CorsLayer}; + +use crate::wui::{ + event::WebUIEvent, + subscriber::{Subscriber, TaskState, WebUIState}, + Error, +}; + +#[derive(Debug, Clone, Serialize, SimpleObject)] +struct Task { + name: String, + state: TaskState, +} + +struct CurrentRun<'a> { + state: &'a SharedState, +} + +#[Object] +impl<'a> CurrentRun<'a> { + async fn tasks(&self) -> Vec { + self.state + .lock() + .await + .tasks() + .iter() + .map(|(task, state)| Task { + name: task.clone(), + state: state.clone(), + }) + .collect() + } +} + +/// We keep the state in a `Arc>>` so both `Subscriber` and +/// `Query` can access it, with `Subscriber` mutating it and `Query` only +/// reading it. +pub(crate) type SharedState = Arc>; + +pub struct Query { + state: SharedState, +} + +impl Query { + pub fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[Object] +impl Query { + async fn current_run(&self) -> CurrentRun { + CurrentRun { state: &self.state } + } +} + +async fn graphiql() -> impl IntoResponse { + response::Html( + GraphiQLSource::build() + .endpoint("/") + .subscription_endpoint("/subscriptions") + .finish(), + ) +} + +pub async fn start_server( + rx: tokio::sync::mpsc::UnboundedReceiver, +) -> Result<(), crate::Error> { + let state = Arc::new(Mutex::new(WebUIState::default())); + let subscriber = Subscriber::new(rx); + tokio::spawn(subscriber.watch(state.clone())); + + run_server(state.clone()).await?; + + Ok(()) +} + +pub(crate) async fn run_server(state: SharedState) -> Result<(), crate::Error> { + let cors = CorsLayer::new() + // allow `GET` and `POST` when accessing the resource + .allow_methods([Method::GET, Method::POST]) + .allow_headers(Any) + // allow requests from any origin + .allow_origin(Any); + + let schema = Schema::new(Query { state }, EmptyMutation, EmptySubscription); + let app = Router::new() + .route("/", get(graphiql).post_service(GraphQL::new(schema))) + .layer(cors); + + axum::serve( + TcpListener::bind("127.0.0.1:8000") + .await + .map_err(Error::Server)?, + app, + ) + .await + .map_err(Error::Server)?; + + Ok(()) +} diff --git a/crates/turborepo-ui/src/wui/subscriber.rs b/crates/turborepo-ui/src/wui/subscriber.rs new file mode 100644 index 0000000000000..c8196c2ba4aba --- /dev/null +++ b/crates/turborepo-ui/src/wui/subscriber.rs @@ -0,0 +1,276 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use async_graphql::{Enum, SimpleObject}; +use serde::Serialize; +use tokio::sync::Mutex; + +use crate::{ + tui::event::{CacheResult, TaskResult}, + wui::{event::WebUIEvent, server::SharedState}, +}; + +/// Subscribes to the Web UI events and updates the state +pub struct Subscriber { + rx: tokio::sync::mpsc::UnboundedReceiver, +} + +impl Subscriber { + pub fn new(rx: tokio::sync::mpsc::UnboundedReceiver) -> Self { + Self { rx } + } + + pub async fn watch( + self, + // We use a tokio::sync::Mutex here because we want this future to be Send. + #[allow(clippy::type_complexity)] state: SharedState, + ) { + let mut rx = self.rx; + while let Some(event) = rx.recv().await { + Self::add_message(&state, event).await; + } + } + + async fn add_message(state: &Arc>, event: WebUIEvent) { + let mut state = state.lock().await; + + match event { + WebUIEvent::StartTask { + task, + output_logs: _, + } => { + state.tasks.insert( + task, + TaskState { + output: Vec::new(), + status: TaskStatus::Running, + cache_result: None, + cache_message: None, + }, + ); + } + WebUIEvent::TaskOutput { task, output } => { + state.tasks.get_mut(&task).unwrap().output.extend(output); + } + WebUIEvent::EndTask { task, result } => { + state.tasks.get_mut(&task).unwrap().status = TaskStatus::from(result); + } + WebUIEvent::CacheStatus { + task, + result, + message, + } => { + if result == CacheResult::Hit { + state.tasks.get_mut(&task).unwrap().status = TaskStatus::Cached; + } + state.tasks.get_mut(&task).unwrap().cache_result = Some(result); + state.tasks.get_mut(&task).unwrap().cache_message = Some(message); + } + WebUIEvent::Stop => { + // TODO: stop watching + } + WebUIEvent::UpdateTasks { tasks } => { + state.tasks = tasks + .into_iter() + .map(|task| { + ( + task, + TaskState { + output: Vec::new(), + status: TaskStatus::Pending, + cache_result: None, + cache_message: None, + }, + ) + }) + .collect(); + } + WebUIEvent::RestartTasks { tasks } => { + state.tasks = tasks + .into_iter() + .map(|task| { + ( + task, + TaskState { + output: Vec::new(), + status: TaskStatus::Running, + cache_result: None, + cache_message: None, + }, + ) + }) + .collect(); + } + } + } +} + +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq, Enum)] +pub enum TaskStatus { + Pending, + Running, + Cached, + Failed, + Succeeded, +} + +impl From for TaskStatus { + fn from(result: TaskResult) -> Self { + match result { + TaskResult::Success => Self::Succeeded, + TaskResult::CacheHit => Self::Cached, + TaskResult::Failure => Self::Failed, + } + } +} + +#[derive(Debug, Clone, Serialize, SimpleObject)] +pub struct TaskState { + output: Vec, + status: TaskStatus, + cache_result: Option, + /// The message for the cache status, i.e. `cache hit, replaying logs` + cache_message: Option, +} + +#[derive(Debug, Default, Clone, Serialize)] +pub struct WebUIState { + tasks: BTreeMap, +} + +impl WebUIState { + pub fn tasks(&self) -> &BTreeMap { + &self.tasks + } +} + +#[cfg(test)] +mod test { + use async_graphql::{EmptyMutation, EmptySubscription, Schema}; + + use super::*; + use crate::{ + tui::event::OutputLogs, + wui::{sender::WebUISender, server::Query}, + }; + + #[tokio::test] + async fn test_web_ui_state() -> Result<(), crate::Error> { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let state = Arc::new(Mutex::new(WebUIState::default())); + let subscriber = Subscriber::new(rx); + + let sender = WebUISender::new(tx); + + // Start a successful task + sender.start_task("task".to_string(), OutputLogs::Full); + sender.output("task".to_string(), b"this is my output".to_vec())?; + sender.end_task("task".to_string(), TaskResult::Success); + + // Start a cached task + sender.start_task("task2".to_string(), OutputLogs::Full); + sender.status("task2".to_string(), "status".to_string(), CacheResult::Hit); + + // Start a failing task + sender.start_task("task3".to_string(), OutputLogs::Full); + sender.end_task("task3".to_string(), TaskResult::Failure); + + // Drop the sender so the subscriber can terminate + drop(sender); + + // Run the subscriber blocking + subscriber.watch(state.clone()).await; + + let state_handle = state.lock().await.clone(); + assert_eq!(state_handle.tasks().len(), 3); + assert_eq!( + state_handle.tasks().get("task2").unwrap().status, + TaskStatus::Cached + ); + assert_eq!( + state_handle.tasks().get("task").unwrap().status, + TaskStatus::Succeeded + ); + assert_eq!( + state_handle.tasks().get("task").unwrap().output, + b"this is my output" + ); + assert_eq!( + state_handle.tasks().get("task3").unwrap().status, + TaskStatus::Failed + ); + + // Now let's check with the GraphQL API + let schema = Schema::new(Query::new(state), EmptyMutation, EmptySubscription); + let result = schema + .execute("query { currentRun { tasks { name state { status } } } }") + .await; + assert!(result.errors.is_empty()); + assert_eq!( + result.data, + async_graphql::Value::from_json(serde_json::json!({ + "currentRun": { + "tasks": [ + { + "name": "task", + "state": { + "status": "SUCCEEDED" + } + }, + { + "name": "task2", + "state": { + "status": "CACHED" + } + }, + { + "name": "task3", + "state": { + "status": "FAILED" + } + } + ] + } + })) + .unwrap() + ); + + Ok(()) + } + + #[tokio::test] + async fn test_restart_tasks() -> Result<(), crate::Error> { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let state = Arc::new(Mutex::new(WebUIState::default())); + let subscriber = Subscriber::new(rx); + + let sender = WebUISender::new(tx); + + // Start a successful task + sender.start_task("task".to_string(), OutputLogs::Full); + sender.output("task".to_string(), b"this is my output".to_vec())?; + sender.end_task("task".to_string(), TaskResult::Success); + + // Start a cached task + sender.start_task("task2".to_string(), OutputLogs::Full); + sender.status("task2".to_string(), "status".to_string(), CacheResult::Hit); + + // Restart a task + sender.restart_tasks(vec!["task".to_string()])?; + + // Drop the sender so the subscriber can terminate + drop(sender); + + // Run the subscriber blocking + subscriber.watch(state.clone()).await; + + let state_handle = state.lock().await.clone(); + assert_eq!(state_handle.tasks().len(), 1); + assert_eq!( + state_handle.tasks().get("task").unwrap().status, + TaskStatus::Running + ); + assert!(state_handle.tasks().get("task").unwrap().output.is_empty()); + + Ok(()) + } +} diff --git a/turborepo-tests/integration/tests/no-args.t b/turborepo-tests/integration/tests/no-args.t index aac1c861c166a..88405d0e2aaa3 100644 --- a/turborepo-tests/integration/tests/no-args.t +++ b/turborepo-tests/integration/tests/no-args.t @@ -39,7 +39,7 @@ Make sure exit code is 2 when no args are passed --heap Specify a file to save a pprof heap profile --ui - Specify whether to use the streaming UI or TUI [possible values: tui, stream] + Specify whether to use the streaming UI or TUI [possible values: tui, stream, web] --login Override the login endpoint --no-color diff --git a/turborepo-tests/integration/tests/turbo-help.t b/turborepo-tests/integration/tests/turbo-help.t index 615ebaf71c0a9..05ca151029722 100644 --- a/turborepo-tests/integration/tests/turbo-help.t +++ b/turborepo-tests/integration/tests/turbo-help.t @@ -39,7 +39,7 @@ Test help flag --heap Specify a file to save a pprof heap profile --ui - Specify whether to use the streaming UI or TUI [possible values: tui, stream] + Specify whether to use the streaming UI or TUI [possible values: tui, stream, web] --login Override the login endpoint --no-color @@ -171,6 +171,7 @@ Test help flag Possible values: - tui: Use the terminal user interface - stream: Use the standard output stream + - web: Use the web user interface (experimental) --login Override the login endpoint @@ -344,7 +345,7 @@ Test help flag for link command --heap Specify a file to save a pprof heap profile --ui - Specify whether to use the streaming UI or TUI [possible values: tui, stream] + Specify whether to use the streaming UI or TUI [possible values: tui, stream, web] --login Override the login endpoint --no-color @@ -392,7 +393,7 @@ Test help flag for unlink command --heap Specify a file to save a pprof heap profile --ui - Specify whether to use the streaming UI or TUI [possible values: tui, stream] + Specify whether to use the streaming UI or TUI [possible values: tui, stream, web] --login Override the login endpoint --no-color @@ -442,7 +443,7 @@ Test help flag for login command --heap Specify a file to save a pprof heap profile --ui - Specify whether to use the streaming UI or TUI [possible values: tui, stream] + Specify whether to use the streaming UI or TUI [possible values: tui, stream, web] --login Override the login endpoint --no-color @@ -490,7 +491,7 @@ Test help flag for logout command --heap Specify a file to save a pprof heap profile --ui - Specify whether to use the streaming UI or TUI [possible values: tui, stream] + Specify whether to use the streaming UI or TUI [possible values: tui, stream, web] --login Override the login endpoint --no-color diff --git a/web-ui/next-env.d.ts b/web-ui/next-env.d.ts deleted file mode 100644 index 4f11a03dc6cc3..0000000000000 --- a/web-ui/next-env.d.ts +++ /dev/null @@ -1,5 +0,0 @@ -/// -/// - -// NOTE: This file should not be edited -// see https://nextjs.org/docs/basic-features/typescript for more information.