Skip to content

Commit

Permalink
Factored out the UI Sender methods into a trait
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasLYang committed Jul 31, 2024
1 parent 8354fbe commit 7c152be
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 133 deletions.
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result<i3
}

if let (Some(handle), Some(sender)) = (handle, sender) {
sender.stop();
//sender.stop();
if let Err(e) = handle.await.expect("render thread panicked") {
error!("error encountered rendering tui: {e}");
}
Expand Down
7 changes: 4 additions & 3 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use turborepo_repository::package_graph::{PackageGraph, PackageName, PackageNode
use turborepo_scm::SCM;
use turborepo_telemetry::events::generic::GenericEventBuilder;
use turborepo_ui::{
cprint, cprintln, tui, tui::AppSender, wui, wui::WebUISender, BOLD_GREY, GREY, UI,
cprint, cprintln, sender::UISender, tui, tui::AppSender, wui, wui::WebUISender, BOLD_GREY,
GREY, UI,
};

pub use crate::run::error::Error;
Expand Down Expand Up @@ -219,7 +220,7 @@ impl Run {

pub async fn run(
&mut self,
experimental_ui_sender: Option<AppSender>,
ui_sender: Option<impl UISender + 'static>,
is_watch: bool,
) -> Result<i32, Error> {
if let Some(subscriber) = self.signal_handler.subscribe() {
Expand Down Expand Up @@ -403,7 +404,7 @@ impl Run {
self.processes.clone(),
&self.repo_root,
global_env,
experimental_ui_sender,
ui_sender,
is_watch,
);

Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::{
};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
use turborepo_ui::{tui, tui::AppSender};
use turborepo_ui::{sender::UISender, tui, tui::AppSender};

use crate::{
cli::{Command, RunArgs},
Expand Down
80 changes: 42 additions & 38 deletions crates/turborepo-lib/src/task_graph/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use turborepo_telemetry::events::{
generic::GenericEventBuilder, task::PackageTaskEventBuilder, EventBuilder, TrackedErrors,
};
use turborepo_ui::{
tui::{self, event::CacheResult, AppSender, TuiTask},
sender::{TaskSender, UISender},
tui::{self, event::CacheResult, AppSender},
wui::WebUISender,
ColorSelector, OutputClient, OutputSink, OutputWriter, PrefixedUI, UI,
};
use which::which;
Expand All @@ -49,7 +51,7 @@ use crate::{
};

// This holds the whole world
pub struct Visitor<'a> {
pub struct Visitor<'a, U> {
color_cache: ColorSelector,
dry: bool,
global_env: EnvironmentVariableMap,
Expand All @@ -64,8 +66,8 @@ pub struct Visitor<'a> {
sink: OutputSink<StdWriter>,
task_hasher: TaskHasher<'a>,
ui: UI,
experimental_ui_sender: Option<AppSender>,
is_watch: bool,
ui_sender: Option<U>,
}

#[derive(Debug, thiserror::Error, Diagnostic)]
Expand Down Expand Up @@ -98,7 +100,7 @@ pub enum Error {
InternalErrors(String),
}

impl<'a> Visitor<'a> {
impl<'a, U: UISender + 'static> Visitor<'a, U> {
// Disabling this lint until we stop adding state to the visitor.
// Once we have the full picture we will go about grouping these pieces of data
// together
Expand All @@ -117,7 +119,7 @@ impl<'a> Visitor<'a> {
manager: ProcessManager,
repo_root: &'a AbsoluteSystemPath,
global_env: EnvironmentVariableMap,
experimental_ui_sender: Option<AppSender>,
ui_sender: Option<U>,
is_watch: bool,
) -> Self {
let task_hasher = TaskHasher::new(
Expand Down Expand Up @@ -145,7 +147,7 @@ impl<'a> Visitor<'a> {
task_hasher,
ui,
global_env,
experimental_ui_sender,
ui_sender,
is_watch,
}
}
Expand Down Expand Up @@ -278,7 +280,7 @@ impl<'a> Visitor<'a> {
let vendor_behavior =
Vendor::infer().and_then(|vendor| vendor.behavior.as_ref());

let output_client = if let Some(handle) = &self.experimental_ui_sender {
let output_client = if let Some(handle) = &self.ui_sender {
TaskOutput::UI(handle.task(info.to_string()))
} else {
TaskOutput::Direct(self.output_client(&info, vendor_behavior))
Expand Down Expand Up @@ -316,7 +318,7 @@ impl<'a> Visitor<'a> {
drop(factory);

if !self.is_watch {
if let Some(handle) = &self.experimental_ui_sender {
if let Some(handle) = &self.ui_sender {
handle.stop();
}
}
Expand Down Expand Up @@ -491,8 +493,8 @@ impl<'a> Visitor<'a> {

pub fn dry_run(&mut self) {
self.dry = true;
// No need to start a TUI on dry run
self.experimental_ui_sender = None;
// No need to start a UI on dry run
self.ui_sender = None;
}
}

Expand Down Expand Up @@ -544,9 +546,9 @@ impl std::io::Write for StdWriter {

/// Small wrapper over our two output types that defines a shared interface for
/// interacting with them.
enum TaskOutput<W> {
enum TaskOutput<W, U> {
Direct(OutputClient<W>),
UI(tui::TuiTask),
UI(TaskSender<U>),
}

fn turbo_regex() -> &'static Regex {
Expand Down Expand Up @@ -624,16 +626,16 @@ impl TaskErrorCause {
}
}

struct ExecContextFactory<'a> {
visitor: &'a Visitor<'a>,
struct ExecContextFactory<'a, U> {
visitor: &'a Visitor<'a, U>,
errors: Arc<Mutex<Vec<TaskError>>>,
manager: ProcessManager,
engine: &'a Arc<Engine>,
}

impl<'a> ExecContextFactory<'a> {
impl<'a, U: UISender> ExecContextFactory<'a, U> {
pub fn new(
visitor: &'a Visitor,
visitor: &'a Visitor<'a, U>,
errors: Arc<Mutex<Vec<TaskError>>>,
manager: ProcessManager,
engine: &'a Arc<Engine>,
Expand Down Expand Up @@ -662,7 +664,7 @@ impl<'a> ExecContextFactory<'a> {
ExecContext {
engine: self.engine.clone(),
ui: self.visitor.ui,
experimental_ui: self.visitor.experimental_ui_sender.is_some(),
experimental_ui: self.visitor.ui_sender.is_some(),
is_github_actions: self.visitor.run_opts.is_github_actions,
pretty_prefix: self
.visitor
Expand Down Expand Up @@ -750,7 +752,7 @@ impl ExecContext {
&mut self,
parent_span_id: Option<tracing::Id>,
tracker: TaskTracker<()>,
output_client: TaskOutput<impl std::io::Write>,
output_client: TaskOutput<impl std::io::Write, impl UISender>,
callback: oneshot::Sender<Result<(), StopExecution>>,
spaces_client: Option<SpacesTaskClient>,
telemetry: &PackageTaskEventBuilder,
Expand Down Expand Up @@ -839,25 +841,27 @@ impl ExecContext {
Ok(())
}

fn prefixed_ui<'a, W: Write>(
fn prefixed_ui<'a, W: Write, U: UISender>(
&self,
output_client: &'a TaskOutput<W>,
) -> TaskCacheOutput<OutputWriter<'a, W>> {
output_client: &'a TaskOutput<W, U>,
) -> TaskCacheOutput<OutputWriter<'a, W>, U> {
match output_client {
TaskOutput::Direct(client) => TaskCacheOutput::Direct(Visitor::prefixed_ui(
self.ui,
self.is_github_actions,
client.stdout(),
client.stderr(),
self.pretty_prefix.clone(),
)),
TaskOutput::Direct(client) => {
TaskCacheOutput::Direct(Visitor::<AppSender>::prefixed_ui(
self.ui,
self.is_github_actions,
client.stdout(),
client.stderr(),
self.pretty_prefix.clone(),
))
}
TaskOutput::UI(task) => TaskCacheOutput::UI(task.clone()),
}
}

async fn execute_inner(
&mut self,
output_client: &TaskOutput<impl std::io::Write>,
output_client: &TaskOutput<impl std::io::Write, impl UISender>,
telemetry: &PackageTaskEventBuilder,
) -> Result<ExecOutcome, InternalError> {
let task_start = Instant::now();
Expand Down Expand Up @@ -1084,13 +1088,13 @@ impl DryRunExecContext {
}

/// Struct for displaying information about task's cache
enum TaskCacheOutput<W> {
enum TaskCacheOutput<W, U> {
Direct(PrefixedUI<W>),
UI(TuiTask),
UI(TaskSender<U>),
}

impl<W: Write> TaskCacheOutput<W> {
fn task_writer(&mut self) -> Either<turborepo_ui::PrefixedWriter<&mut W>, TuiTask> {
impl<W: Write, U: UISender> TaskCacheOutput<W, U> {
fn task_writer(&mut self) -> Either<turborepo_ui::PrefixedWriter<&mut W>, TaskSender<U>> {
match self {
TaskCacheOutput::Direct(prefixed) => Either::Left(prefixed.output_prefixed_writer()),
TaskCacheOutput::UI(task) => Either::Right(task.clone()),
Expand All @@ -1107,7 +1111,7 @@ impl<W: Write> TaskCacheOutput<W> {
}
}

impl<W: Write> CacheOutput for TaskCacheOutput<W> {
impl<W: Write, U: UISender> CacheOutput for TaskCacheOutput<W, U> {
fn status(&mut self, message: &str, result: CacheResult) {
match self {
TaskCacheOutput::Direct(direct) => direct.output(message),
Expand Down Expand Up @@ -1136,7 +1140,7 @@ impl<W: Write> CacheOutput for TaskCacheOutput<W> {
}

/// Struct for displaying information about task
impl<W: Write> TaskOutput<W> {
impl<W: Write, U: UISender> TaskOutput<W, U> {
pub fn finish(self, use_error: bool, is_cache_hit: bool) -> std::io::Result<Option<Vec<u8>>> {
match self {
TaskOutput::Direct(client) => client.finish(use_error),
Expand All @@ -1145,21 +1149,21 @@ impl<W: Write> TaskOutput<W> {
}
}

pub fn stdout(&self) -> Either<OutputWriter<W>, TuiTask> {
pub fn stdout(&self) -> Either<OutputWriter<W>, TaskSender<U>> {
match self {
TaskOutput::Direct(client) => Either::Left(client.stdout()),
TaskOutput::UI(client) => Either::Right(client.clone()),
}
}

pub fn stderr(&self) -> Either<OutputWriter<W>, TuiTask> {
pub fn stderr(&self) -> Either<OutputWriter<W>, TaskSender<U>> {
match self {
TaskOutput::Direct(client) => Either::Left(client.stderr()),
TaskOutput::UI(client) => Either::Right(client.clone()),
}
}

pub fn task_logs(&self) -> Either<OutputWriter<W>, TuiTask> {
pub fn task_logs(&self) -> Either<OutputWriter<W>, TaskSender<U>> {
match self {
TaskOutput::Direct(client) => Either::Left(client.stdout()),
TaskOutput::UI(client) => Either::Right(client.clone()),
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-ui/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod line;
mod logs;
mod output;
mod prefixed;
pub mod sender;
pub mod tui;
pub mod wui;

Expand Down
73 changes: 73 additions & 0 deletions crates/turborepo-ui/src/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::{
io::Write,
sync::{Arc, Mutex},
};

use crate::tui::event::{CacheResult, OutputLogs, TaskResult};

/// Trait to abstract over sending events to either the TUI or the Web UI
pub trait UISender: Clone + Send + Sync + 'static {
type Error;
fn start_task(&self, task: String, output_logs: OutputLogs);
fn end_task(&self, task: String, result: TaskResult);

fn status(&self, task: String, status: String, result: CacheResult);
fn set_stdin(&self, task: String, stdin: Box<dyn std::io::Write + Send>);

fn output(&self, task: String, output: Vec<u8>) -> Result<(), Self::Error>;
fn task(&self, task: String) -> TaskSender<Self>
where
Self: Sized;
fn stop(&self);
fn update_tasks(&self, tasks: Vec<String>) -> Result<(), Self::Error>;
}

#[derive(Debug, Clone)]
pub struct TaskSender<T> {
pub(crate) name: String,
pub(crate) handle: T,
pub(crate) logs: Arc<Mutex<Vec<u8>>>,
}

impl<T: UISender> TaskSender<T> {
/// Access the underlying AppSender
pub fn as_app(&self) -> &T {
&self.handle
}

/// Mark the task as started
pub fn start(&self, output_logs: OutputLogs) {
self.handle.start_task(self.name.clone(), output_logs);
}

/// Mark the task as finished
pub fn succeeded(&self, is_cache_hit: bool) -> Vec<u8> {
if is_cache_hit {
self.finish(TaskResult::CacheHit)
} else {
self.finish(TaskResult::Success)
}
}

/// Mark the task as finished
pub fn failed(&self) -> Vec<u8> {
self.finish(TaskResult::Failure)
}

fn finish(&self, result: TaskResult) -> Vec<u8> {
self.handle.end_task(self.name.clone(), result);
self.logs.lock().expect("logs lock poisoned").clone()
}

pub fn set_stdin(&self, stdin: Box<dyn std::io::Write + Send>) {
self.handle.set_stdin(self.name.clone(), stdin);
}

pub fn status(&self, status: &str, result: CacheResult) {
// Since this will be rendered via ratatui we any ANSI escape codes will not be
// handled.
// TODO: prevent the status from having ANSI codes in this scenario
let status = console::strip_ansi_codes(status).into_owned();
self.handle.status(self.name.clone(), status, result);
}
}
Loading

0 comments on commit 7c152be

Please sign in to comment.