diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index fa3148a6df2860..21b5cc278619e0 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -53,7 +53,7 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result, + ui_sender: Option, is_watch: bool, ) -> Result { if let Some(subscriber) = self.signal_handler.subscribe() { @@ -403,7 +404,7 @@ impl Run { self.processes.clone(), &self.repo_root, global_env, - experimental_ui_sender, + ui_sender, is_watch, ); diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 7b13f6ed148c90..99afab382358b2 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -10,7 +10,7 @@ use tokio::{ }; use turborepo_repository::package_graph::PackageName; use turborepo_telemetry::events::command::CommandEventBuilder; -use turborepo_ui::{tui, tui::AppSender}; +use turborepo_ui::{sender::UISender, tui, tui::AppSender}; use crate::{ cli::{Command, RunArgs}, diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index 27f301ca42c3b8..6925934092c6d3 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -25,7 +25,9 @@ use turborepo_telemetry::events::{ generic::GenericEventBuilder, task::PackageTaskEventBuilder, EventBuilder, TrackedErrors, }; use turborepo_ui::{ - tui::{self, event::CacheResult, AppSender, TuiTask}, + sender::{TaskSender, UISender}, + tui::{self, event::CacheResult, AppSender}, + wui::WebUISender, ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, UI, }; use which::which; @@ -49,7 +51,7 @@ use crate::{ }; // This holds the whole world -pub struct Visitor<'a> { +pub struct Visitor<'a, U> { color_cache: ColorSelector, dry: bool, global_env: EnvironmentVariableMap, @@ -64,8 +66,8 @@ pub struct Visitor<'a> { sink: OutputSink, task_hasher: TaskHasher<'a>, ui: UI, - experimental_ui_sender: Option, is_watch: bool, + ui_sender: Option, } #[derive(Debug, thiserror::Error, Diagnostic)] @@ -98,7 +100,7 @@ pub enum Error { InternalErrors(String), } -impl<'a> Visitor<'a> { +impl<'a, U: UISender + 'static> Visitor<'a, U> { // Disabling this lint until we stop adding state to the visitor. // Once we have the full picture we will go about grouping these pieces of data // together @@ -117,7 +119,7 @@ impl<'a> Visitor<'a> { manager: ProcessManager, repo_root: &'a AbsoluteSystemPath, global_env: EnvironmentVariableMap, - experimental_ui_sender: Option, + ui_sender: Option, is_watch: bool, ) -> Self { let task_hasher = TaskHasher::new( @@ -145,7 +147,7 @@ impl<'a> Visitor<'a> { task_hasher, ui, global_env, - experimental_ui_sender, + ui_sender, is_watch, } } @@ -278,7 +280,7 @@ impl<'a> Visitor<'a> { let vendor_behavior = Vendor::infer().and_then(|vendor| vendor.behavior.as_ref()); - let output_client = if let Some(handle) = &self.experimental_ui_sender { + let output_client = if let Some(handle) = &self.ui_sender { TaskOutput::UI(handle.task(info.to_string())) } else { TaskOutput::Direct(self.output_client(&info, vendor_behavior)) @@ -316,7 +318,7 @@ impl<'a> Visitor<'a> { drop(factory); if !self.is_watch { - if let Some(handle) = &self.experimental_ui_sender { + if let Some(handle) = &self.ui_sender { handle.stop(); } } @@ -491,8 +493,8 @@ impl<'a> Visitor<'a> { pub fn dry_run(&mut self) { self.dry = true; - // No need to start a TUI on dry run - self.experimental_ui_sender = None; + // No need to start a UI on dry run + self.ui_sender = None; } } @@ -544,9 +546,9 @@ impl std::io::Write for StdWriter { /// Small wrapper over our two output types that defines a shared interface for /// interacting with them. -enum TaskOutput { +enum TaskOutput { Direct(OutputClient), - UI(tui::TuiTask), + UI(TaskSender), } fn turbo_regex() -> &'static Regex { @@ -624,16 +626,16 @@ impl TaskErrorCause { } } -struct ExecContextFactory<'a> { - visitor: &'a Visitor<'a>, +struct ExecContextFactory<'a, U> { + visitor: &'a Visitor<'a, U>, errors: Arc>>, manager: ProcessManager, engine: &'a Arc, } -impl<'a> ExecContextFactory<'a> { +impl<'a, U: UISender> ExecContextFactory<'a, U> { pub fn new( - visitor: &'a Visitor, + visitor: &'a Visitor<'a, U>, errors: Arc>>, manager: ProcessManager, engine: &'a Arc, @@ -662,7 +664,7 @@ impl<'a> ExecContextFactory<'a> { ExecContext { engine: self.engine.clone(), ui: self.visitor.ui, - experimental_ui: self.visitor.experimental_ui_sender.is_some(), + experimental_ui: self.visitor.ui_sender.is_some(), is_github_actions: self.visitor.run_opts.is_github_actions, pretty_prefix: self .visitor @@ -750,7 +752,7 @@ impl ExecContext { &mut self, parent_span_id: Option, tracker: TaskTracker<()>, - output_client: TaskOutput, + output_client: TaskOutput, callback: oneshot::Sender>, spaces_client: Option, telemetry: &PackageTaskEventBuilder, @@ -839,25 +841,27 @@ impl ExecContext { Ok(()) } - fn prefixed_ui<'a, W: Write>( + fn prefixed_ui<'a, W: Write, U: UISender>( &self, - output_client: &'a TaskOutput, - ) -> TaskCacheOutput> { + output_client: &'a TaskOutput, + ) -> TaskCacheOutput, U> { match output_client { - TaskOutput::Direct(client) => TaskCacheOutput::Direct(Visitor::prefixed_ui( - self.ui, - self.is_github_actions, - client.stdout(), - client.stderr(), - self.pretty_prefix.clone(), - )), + TaskOutput::Direct(client) => { + TaskCacheOutput::Direct(Visitor::::prefixed_ui( + self.ui, + self.is_github_actions, + client.stdout(), + client.stderr(), + self.pretty_prefix.clone(), + )) + } TaskOutput::UI(task) => TaskCacheOutput::UI(task.clone()), } } async fn execute_inner( &mut self, - output_client: &TaskOutput, + output_client: &TaskOutput, telemetry: &PackageTaskEventBuilder, ) -> Result { let task_start = Instant::now(); @@ -1084,13 +1088,13 @@ impl DryRunExecContext { } /// Struct for displaying information about task's cache -enum TaskCacheOutput { +enum TaskCacheOutput { Direct(PrefixedUI), - UI(TuiTask), + UI(TaskSender), } -impl TaskCacheOutput { - fn task_writer(&mut self) -> Either, TuiTask> { +impl TaskCacheOutput { + fn task_writer(&mut self) -> Either, TaskSender> { match self { TaskCacheOutput::Direct(prefixed) => Either::Left(prefixed.output_prefixed_writer()), TaskCacheOutput::UI(task) => Either::Right(task.clone()), @@ -1107,7 +1111,7 @@ impl TaskCacheOutput { } } -impl CacheOutput for TaskCacheOutput { +impl CacheOutput for TaskCacheOutput { fn status(&mut self, message: &str, result: CacheResult) { match self { TaskCacheOutput::Direct(direct) => direct.output(message), @@ -1136,7 +1140,7 @@ impl CacheOutput for TaskCacheOutput { } /// Struct for displaying information about task -impl TaskOutput { +impl TaskOutput { pub fn finish(self, use_error: bool, is_cache_hit: bool) -> std::io::Result>> { match self { TaskOutput::Direct(client) => client.finish(use_error), @@ -1145,21 +1149,21 @@ impl TaskOutput { } } - pub fn stdout(&self) -> Either, TuiTask> { + pub fn stdout(&self) -> Either, TaskSender> { match self { TaskOutput::Direct(client) => Either::Left(client.stdout()), TaskOutput::UI(client) => Either::Right(client.clone()), } } - pub fn stderr(&self) -> Either, TuiTask> { + pub fn stderr(&self) -> Either, TaskSender> { match self { TaskOutput::Direct(client) => Either::Left(client.stderr()), TaskOutput::UI(client) => Either::Right(client.clone()), } } - pub fn task_logs(&self) -> Either, TuiTask> { + pub fn task_logs(&self) -> Either, TaskSender> { match self { TaskOutput::Direct(client) => Either::Left(client.stdout()), TaskOutput::UI(client) => Either::Right(client.clone()), diff --git a/crates/turborepo-ui/src/lib.rs b/crates/turborepo-ui/src/lib.rs index 1360b75a5ee791..d4a153d75eb05d 100644 --- a/crates/turborepo-ui/src/lib.rs +++ b/crates/turborepo-ui/src/lib.rs @@ -9,6 +9,7 @@ mod line; mod logs; mod output; mod prefixed; +pub mod sender; pub mod tui; pub mod wui; diff --git a/crates/turborepo-ui/src/sender.rs b/crates/turborepo-ui/src/sender.rs new file mode 100644 index 00000000000000..75bf47320b9b16 --- /dev/null +++ b/crates/turborepo-ui/src/sender.rs @@ -0,0 +1,73 @@ +use std::{ + io::Write, + sync::{Arc, Mutex}, +}; + +use crate::tui::event::{CacheResult, OutputLogs, TaskResult}; + +/// Trait to abstract over sending events to either the TUI or the Web UI +pub trait UISender: Clone + Send + Sync + 'static { + type Error; + fn start_task(&self, task: String, output_logs: OutputLogs); + fn end_task(&self, task: String, result: TaskResult); + + fn status(&self, task: String, status: String, result: CacheResult); + fn set_stdin(&self, task: String, stdin: Box); + + fn output(&self, task: String, output: Vec) -> Result<(), Self::Error>; + fn task(&self, task: String) -> TaskSender + where + Self: Sized; + fn stop(&self); + fn update_tasks(&self, tasks: Vec) -> Result<(), Self::Error>; +} + +#[derive(Debug, Clone)] +pub struct TaskSender { + pub(crate) name: String, + pub(crate) handle: T, + pub(crate) logs: Arc>>, +} + +impl TaskSender { + /// Access the underlying AppSender + pub fn as_app(&self) -> &T { + &self.handle + } + + /// Mark the task as started + pub fn start(&self, output_logs: OutputLogs) { + self.handle.start_task(self.name.clone(), output_logs); + } + + /// Mark the task as finished + pub fn succeeded(&self, is_cache_hit: bool) -> Vec { + if is_cache_hit { + self.finish(TaskResult::CacheHit) + } else { + self.finish(TaskResult::Success) + } + } + + /// Mark the task as finished + pub fn failed(&self) -> Vec { + self.finish(TaskResult::Failure) + } + + fn finish(&self, result: TaskResult) -> Vec { + self.handle.end_task(self.name.clone(), result); + self.logs.lock().expect("logs lock poisoned").clone() + } + + pub fn set_stdin(&self, stdin: Box) { + self.handle.set_stdin(self.name.clone(), stdin); + } + + pub fn status(&self, status: &str, result: CacheResult) { + // Since this will be rendered via ratatui we any ANSI escape codes will not be + // handled. + // TODO: prevent the status from having ANSI codes in this scenario + let status = console::strip_ansi_codes(status).into_owned(); + self.handle.status(self.name.clone(), status, result); + } +} diff --git a/crates/turborepo-ui/src/tui/handle.rs b/crates/turborepo-ui/src/tui/handle.rs index fe5ac0aa3a0f7d..11f4d30d105f76 100644 --- a/crates/turborepo-ui/src/tui/handle.rs +++ b/crates/turborepo-ui/src/tui/handle.rs @@ -7,6 +7,7 @@ use super::{ event::{CacheResult, OutputLogs}, Event, TaskResult, }; +use crate::sender::{TaskSender, UISender}; /// Struct for sending app events to TUI rendering #[derive(Debug, Clone)] @@ -19,14 +20,6 @@ pub struct AppReceiver { primary: mpsc::Receiver, } -/// Struct for sending events related to a specific task -#[derive(Debug, Clone)] -pub struct TuiTask { - name: String, - handle: AppSender, - logs: Arc>>, -} - impl AppSender { /// Create a new channel for sending app events. /// @@ -43,10 +36,36 @@ impl AppSender { }, ) } +} + +impl UISender for AppSender { + fn start_task(&self, task: String, output_logs: OutputLogs) { + self.primary + .send(Event::StartTask { task, output_logs }) + .ok(); + } + + fn end_task(&self, task: String, result: TaskResult) { + self.primary.send(Event::EndTask { task, result }).ok(); + } + + fn status(&self, task: String, status: String, result: CacheResult) { + self.primary + .send(Event::Status { + task, + status, + result, + }) + .ok(); + } + + fn set_stdin(&self, task: String, stdin: Box) { + self.primary.send(Event::SetStdin { task, stdin }).ok(); + } /// Construct a sender configured for a specific task - pub fn task(&self, task: String) -> TuiTask { - TuiTask { + fn task(&self, task: String) -> TaskSender { + TaskSender { name: task, handle: self.clone(), logs: Default::default(), @@ -54,7 +73,7 @@ impl AppSender { } /// Stop rendering TUI and restore terminal to default configuration - pub fn stop(&self) { + fn stop(&self) { let (callback_tx, callback_rx) = mpsc::sync_channel(1); // Send stop event, if receiver has dropped ignore error as // it'll be a no-op. @@ -63,14 +82,20 @@ impl AppSender { callback_rx.recv().ok(); } + type Error = mpsc::SendError; + /// Update the list of tasks displayed in the TUI - pub fn update_tasks(&self, tasks: Vec) -> Result<(), mpsc::SendError> { + fn update_tasks(&self, tasks: Vec) -> Result<(), Self::Error> { self.primary.send(Event::UpdateTasks { tasks }) } + + fn output(&self, task: String, output: Vec) -> Result<(), Self::Error> { + self.primary.send(Event::TaskOutput { task, output }) + } } impl AppReceiver { - /// Receive an event, producing a tick event if no events are received by + /// Receive an event, producing a tick event if no events are rec eived by /// the deadline. pub fn recv(&self, deadline: Instant) -> Result { match self.primary.recv_deadline(deadline) { @@ -81,75 +106,7 @@ impl AppReceiver { } } -impl TuiTask { - /// Access the underlying AppSender - pub fn as_app(&self) -> &AppSender { - &self.handle - } - - /// Mark the task as started - pub fn start(&self, output_logs: OutputLogs) { - self.handle - .primary - .send(Event::StartTask { - task: self.name.clone(), - output_logs, - }) - .ok(); - } - - /// Mark the task as finished - pub fn succeeded(&self, is_cache_hit: bool) -> Vec { - if is_cache_hit { - self.finish(TaskResult::CacheHit) - } else { - self.finish(TaskResult::Success) - } - } - - /// Mark the task as finished - pub fn failed(&self) -> Vec { - self.finish(TaskResult::Failure) - } - - fn finish(&self, result: TaskResult) -> Vec { - self.handle - .primary - .send(Event::EndTask { - task: self.name.clone(), - result, - }) - .ok(); - self.logs.lock().expect("logs lock poisoned").clone() - } - - pub fn set_stdin(&self, stdin: Box) { - self.handle - .primary - .send(Event::SetStdin { - task: self.name.clone(), - stdin, - }) - .ok(); - } - - pub fn status(&self, status: &str, result: CacheResult) { - // Since this will be rendered via ratatui we any ANSI escape codes will not be - // handled. - // TODO: prevent the status from having ANSI codes in this scenario - let status = console::strip_ansi_codes(status).into_owned(); - self.handle - .primary - .send(Event::Status { - task: self.name.clone(), - status, - result, - }) - .ok(); - } -} - -impl std::io::Write for TuiTask { +impl std::io::Write for TaskSender { fn write(&mut self, buf: &[u8]) -> std::io::Result { let task = self.name.clone(); { @@ -158,12 +115,9 @@ impl std::io::Write for TuiTask { .expect("log lock poisoned") .extend_from_slice(buf); } + self.handle - .primary - .send(Event::TaskOutput { - task, - output: buf.to_vec(), - }) + .output(task, buf.to_vec()) .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "receiver dropped"))?; Ok(buf.len()) } diff --git a/crates/turborepo-ui/src/tui/mod.rs b/crates/turborepo-ui/src/tui/mod.rs index 93a016ecc04e03..a614dbd8ce8d17 100644 --- a/crates/turborepo-ui/src/tui/mod.rs +++ b/crates/turborepo-ui/src/tui/mod.rs @@ -12,7 +12,7 @@ mod term_output; pub use app::{run_app, terminal_big_enough}; use clipboard::copy_to_clipboard; use event::{Event, TaskResult}; -pub use handle::{AppReceiver, AppSender, TuiTask}; +pub use handle::{AppReceiver, AppSender}; use input::{input, InputOptions}; pub use pane::TerminalPane; pub use table::TaskTable; diff --git a/crates/turborepo-ui/src/wui/mod.rs b/crates/turborepo-ui/src/wui/mod.rs index 734062713aa5cc..1cfb4dd3f04dd5 100644 --- a/crates/turborepo-ui/src/wui/mod.rs +++ b/crates/turborepo-ui/src/wui/mod.rs @@ -1,6 +1,8 @@ //! Web UI for Turborepo. Creates a WebSocket server that can be subscribed to //! by a web client to display the status of tasks. +use std::io::Write; + use axum::{ extract::{ ws::{Message, WebSocket}, @@ -13,7 +15,10 @@ use axum::{ use serde::Serialize; use thiserror::Error; -use crate::tui::event::{CacheResult, Event, OutputLogs, TaskResult}; +use crate::{ + sender::{TaskSender, UISender}, + tui::event::{CacheResult, Event, OutputLogs, TaskResult}, +}; #[derive(Debug, Error)] pub enum Error { @@ -23,14 +28,54 @@ pub enum Error { WebSocket(#[source] axum::Error), } +#[derive(Clone)] pub struct WebUISender { pub tx: tokio::sync::broadcast::Sender, } +impl UISender for WebUISender { + type Error = std::io::Error; + + fn start_task(&self, task: String, output_logs: OutputLogs) { + todo!() + } + + fn end_task(&self, task: String, result: TaskResult) { + todo!() + } + + fn status(&self, task: String, status: String, result: CacheResult) { + todo!() + } + + fn set_stdin(&self, task: String, stdin: Box) { + todo!() + } + + fn task(&self, task: String) -> TaskSender + where + Self: Sized, + { + todo!() + } + + fn stop(&self) { + todo!() + } + + fn update_tasks(&self, tasks: Vec) -> Result<(), Self::Error> { + todo!() + } + + fn output(&self, task: String, output: Vec) -> Result<(), Self::Error> { + todo!() + } +} + // Specific events that the websocket server can send to the client, // not all the `Event` types from the TUI #[derive(Debug, Clone, Serialize)] -enum WebUIEvent { +pub enum WebUIEvent { StartTask { task: String, output_logs: OutputLogs,