diff --git a/crates/rome_cli/src/execute.rs b/crates/rome_cli/src/execute.rs index 61b1ca16abe..e557b11ea8c 100644 --- a/crates/rome_cli/src/execute.rs +++ b/crates/rome_cli/src/execute.rs @@ -9,7 +9,6 @@ use rome_service::workspace::{ use std::path::PathBuf; /// Useful information during the traversal of files and virtual content -#[derive(Clone)] pub(crate) struct Execution { /// How the information should be collected and reported report_mode: ReportMode, @@ -18,7 +17,6 @@ pub(crate) struct Execution { traversal_mode: TraversalMode, } -#[derive(Clone)] pub(crate) enum TraversalMode { /// This mode is enabled when running the command `rome check` Check { @@ -47,7 +45,7 @@ pub(crate) enum TraversalMode { } /// Tells to the execution of the traversal how the information should be reported -#[derive(Clone, Default)] +#[derive(Copy, Clone, Default)] pub(crate) enum ReportMode { /// Reports information straight to the console, it's the default mode #[default] diff --git a/crates/rome_cli/src/traversal.rs b/crates/rome_cli/src/traversal.rs index dabdc4f2d5a..3ae79e915c1 100644 --- a/crates/rome_cli/src/traversal.rs +++ b/crates/rome_cli/src/traversal.rs @@ -2,7 +2,10 @@ use crate::{ CliSession, Execution, FormatterReportFileDetail, FormatterReportSummary, Report, ReportDiagnostic, ReportDiff, ReportErrorKind, ReportKind, Termination, TraversalMode, }; -use crossbeam::channel::{unbounded, Receiver, Sender}; +use crossbeam::{ + channel::{unbounded, Receiver, Sender}, + select, +}; use rome_console::{markup, Console, ConsoleExt}; use rome_diagnostics::{ file::FileId, @@ -31,13 +34,16 @@ use std::{ panic::catch_unwind, path::{Path, PathBuf}, sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, - Arc, + atomic::{AtomicU16, AtomicUsize, Ordering}, + Once, }, + thread, time::{Duration, Instant}, }; pub(crate) fn traverse(execution: Execution, mut session: CliSession) -> Result<(), Termination> { + init_thread_pool(); + // Check that at least one input file / directory was specified in the command line let mut inputs = vec![]; @@ -68,7 +74,7 @@ pub(crate) fn traverse(execution: Execution, mut session: CliSession) -> Result< let (interner, recv_files) = AtomicInterner::new(); let (send_msgs, recv_msgs) = unbounded(); - let (sender_reports_from_traversal, receiver_reports) = unbounded(); + let (sender_reports, recv_reports) = unbounded(); let processed = AtomicUsize::new(0); let skipped = AtomicUsize::new(0); @@ -77,102 +83,103 @@ pub(crate) fn traverse(execution: Execution, mut session: CliSession) -> Result< let workspace = &*session.app.workspace; let console = &mut *session.app.console; - let mut has_errors = None; - let remaining_diagnostics = Arc::new(AtomicU64::new(u64::MAX)); + // The command `rome check` gives a default value of 20. + // In case of other commands that pass here, we limit to 50 to avoid to delay the terminal. + // Once `--max-diagnostics` will be a global argument, `unwrap_of_default` should be enough. + let max_diagnostics = execution + .get_max_diagnostics() + .unwrap_or(MAXIMUM_DISPLAYABLE_DIAGNOSTICS); - let mut duration = None; - let mut report = None; - let sender_reports_from_messages = sender_reports_from_traversal.clone(); - rayon::scope(|s| { - s.spawn(|_| { - report = Some(collect_reports(receiver_reports)); - }); + let remaining_diagnostics = AtomicU16::new(max_diagnostics); - s.spawn(|_| { - has_errors = Some(process_messages(ProcessMessagesOptions { - execution: execution.clone(), - console, - recv_files, - recv_msgs, - sender_reports: sender_reports_from_messages, - remaining_diagnostics: Arc::clone(&remaining_diagnostics), - })); - }); - s.spawn(|_| { - // The traversal context is scoped to ensure all the channels it - // contains are properly closed once the traversal finishes - duration = Some(traverse_inputs( + let mut has_errors = false; + let mut report = Report::default(); + + let duration = thread::scope(|s| { + thread::Builder::new() + .name(String::from("rome::console")) + .spawn_scoped(s, || { + process_messages(ProcessMessagesOptions { + execution: &execution, + console, + recv_reports, + recv_files, + recv_msgs, + max_diagnostics, + remaining_diagnostics: &remaining_diagnostics, + has_errors: &mut has_errors, + report: &mut report, + }); + }) + .expect("failed to spawn console thread"); + + // The traversal context is scoped to ensure all the channels it + // contains are properly closed once the traversal finishes + traverse_inputs( + fs, + inputs, + &TraversalOptions { fs, - inputs, - &TraversalOptions { - fs, - workspace, - execution: execution.clone(), - interner, - processed: &processed, - skipped: &skipped, - messages: send_msgs, - sender_reports: sender_reports_from_traversal, - remaining_diagnostics: Arc::clone(&remaining_diagnostics), - }, - )); - }) + workspace, + execution: &execution, + interner, + processed: &processed, + skipped: &skipped, + messages: send_msgs, + sender_reports, + remaining_diagnostics: &remaining_diagnostics, + }, + ) }); let count = processed.load(Ordering::Relaxed); let skipped = skipped.load(Ordering::Relaxed); - let to_terminal = execution.should_report_to_terminal(); - - if let Some(duration) = duration { - if to_terminal { - match execution.traversal_mode() { - TraversalMode::Check { .. } => { - if execution.as_fix_file_mode().is_some() { - console.log(markup! { - "Fixed "{count}" files in "{duration} - }); - } else { - console.log(markup! { - "Checked "{count}" files in "{duration} - }); - } - } - TraversalMode::CI { .. } => { + if execution.should_report_to_terminal() { + match execution.traversal_mode() { + TraversalMode::Check { .. } => { + if execution.as_fix_file_mode().is_some() { console.log(markup! { - "Checked "{count}" files in "{duration} + "Fixed "{count}" files in "{duration} }); - } - TraversalMode::Format { write: false, .. } => { - if to_terminal { - console.log(markup! { - "Compared "{count}" files in "{duration} - }); - } - } - TraversalMode::Format { write: true, .. } => { + } else { console.log(markup! { - "Formatted "{count}" files in "{duration} + "Checked "{count}" files in "{duration} }); } } - } else if let Some(mut report) = report { - if let TraversalMode::Format { write, .. } = execution.traversal_mode() { - let mut summary = FormatterReportSummary::default(); - if *write { - summary.set_files_written(count); - } else { - summary.set_files_compared(count); - } - report.set_formatter_summary(summary); + TraversalMode::CI { .. } => { + console.log(markup! { + "Checked "{count}" files in "{duration} + }); + } + TraversalMode::Format { write: false, .. } => { + console.log(markup! { + "Compared "{count}" files in "{duration} + }); + } + TraversalMode::Format { write: true, .. } => { + console.log(markup! { + "Formatted "{count}" files in "{duration} + }); } - - let to_print = report.as_serialized_reports()?; - console.log(markup! { - {to_print} - }); - return Ok(()); } + } else { + if let TraversalMode::Format { write, .. } = execution.traversal_mode() { + let mut summary = FormatterReportSummary::default(); + if *write { + summary.set_files_written(count); + } else { + summary.set_files_compared(count); + } + report.set_formatter_summary(summary); + } + + let to_print = report.as_serialized_reports()?; + console.log(markup! { + {to_print} + }); + return Ok(()); } if skipped > 0 { @@ -182,13 +189,26 @@ pub(crate) fn traverse(execution: Execution, mut session: CliSession) -> Result< } // Processing emitted error diagnostics, exit with a non-zero code - if matches!(has_errors, Some(true)) { + if has_errors { Err(Termination::CheckError) } else { Ok(()) } } +/// This function will setup the global Rayon thread pool the first time it's called +/// +/// This is currently only used to assign friendly debug names to the threads of the pool +fn init_thread_pool() { + static INIT_ONCE: Once = Once::new(); + INIT_ONCE.call_once(|| { + rayon::ThreadPoolBuilder::new() + .thread_name(|index| format!("rome::worker_{index}")) + .build_global() + .expect("failed to initialize the global thread pool"); + }); +} + /// Initiate the filesystem traversal tasks with the provided input paths and /// run it to completion, returning the duration of the process fn traverse_inputs(fs: &dyn FileSystem, inputs: Vec, ctx: &TraversalOptions) -> Duration { @@ -205,18 +225,26 @@ fn traverse_inputs(fs: &dyn FileSystem, inputs: Vec, ctx: &TraversalOp struct ProcessMessagesOptions<'ctx> { /// Execution of the traversal - execution: Execution, + execution: &'ctx Execution, /// Mutable reference to the [console](Console) console: &'ctx mut dyn Console, + /// Receiver channel for reporting statistics + recv_reports: Receiver, /// Receiver channel that expects info when a file is processed recv_files: Receiver<(FileId, PathBuf)>, /// Receiver channel that expects info when a message is sent recv_msgs: Receiver, - /// Sender of reports - sender_reports: Sender, + /// The maximum number of diagnostics the console thread is allowed to print + max_diagnostics: u16, /// The approximate number of diagnostics the console will print before /// folding the rest into the "skipped diagnostics" counter - remaining_diagnostics: Arc, + remaining_diagnostics: &'ctx AtomicU16, + /// Mutable reference to a boolean flag tracking whether the console thread + /// printed any error-level message + has_errors: &'ctx mut bool, + /// Mutable handle to a [Report] instance the console thread should write + /// stats into + report: &'ctx mut Report, } #[derive(Debug, v2::Diagnostic)] @@ -272,30 +300,49 @@ struct TraversalDiagnostic<'a> { /// This thread receives [Message]s from the workers through the `recv_msgs` /// and `recv_files` channels and handles them based on [Execution] -fn process_messages(options: ProcessMessagesOptions) -> bool { +fn process_messages(options: ProcessMessagesOptions) { let ProcessMessagesOptions { execution: mode, console, + recv_reports, recv_files, recv_msgs, - sender_reports, + max_diagnostics, remaining_diagnostics, + has_errors, + report, } = options; - // The command `rome check` gives a default value of 20. - // In case of other commands that pass here, we limit to 50 to avoid to delay the terminal. - // Once `--max-diagnostics` will be a global argument, `unwrap_of_default` should be enough. - let max_diagnostics = mode - .get_max_diagnostics() - .unwrap_or(MAXIMUM_DISPLAYABLE_DIAGNOSTICS); - - let mut has_errors = false; let mut paths = HashMap::new(); let mut printed_diagnostics: u16 = 0; let mut not_printed_diagnostics = 0; let mut total_skipped_suggested_fixes = 0; - while let Ok(msg) = recv_msgs.recv() { + let mut is_msg_open = true; + let mut is_report_open = true; + + while is_msg_open || is_report_open { + let msg = select! { + recv(recv_msgs) -> msg => match msg { + Ok(msg) => msg, + Err(_) => { + is_msg_open = false; + continue; + }, + }, + recv(recv_reports) -> stat => { + match stat { + Ok(stat) => { + report.push_detail_report(stat); + } + Err(_) => { + is_report_open = false; + }, + } + continue; + } + }; + match msg { Message::SkippedFixes { skipped_suggested_fixes, @@ -355,16 +402,14 @@ fn process_messages(options: ProcessMessagesOptions) -> bool { let title = PrintDescription(&err).to_string(); let code = err.category().and_then(|code| code.name().parse().ok()); - sender_reports - .send(ReportKind::Error( - file_name.to_string(), - ReportErrorKind::Diagnostic(ReportDiagnostic { - code, - title, - severity: err.severity(), - }), - )) - .ok(); + report.push_detail_report(ReportKind::Error( + file_name.to_string(), + ReportErrorKind::Diagnostic(ReportDiagnostic { + code, + title, + severity: err.severity(), + }), + )); } } @@ -379,7 +424,7 @@ fn process_messages(options: ProcessMessagesOptions) -> bool { // is CI mode we want to print all the diagnostics if mode.is_ci() { for diag in diagnostics { - has_errors |= diag.severity() == Severity::Error; + *has_errors |= diag.severity() == Severity::Error; let diag = diag.with_file_path(&name).with_file_source_code(&content); console.error(markup! { {PrintDiagnostic(&diag)} @@ -388,13 +433,13 @@ fn process_messages(options: ProcessMessagesOptions) -> bool { } else { for diag in diagnostics { let severity = diag.severity(); - has_errors |= severity == Severity::Error; + *has_errors |= severity == Severity::Error; let should_print = printed_diagnostics < max_diagnostics; if should_print { printed_diagnostics += 1; remaining_diagnostics.store( - u64::from(max_diagnostics.saturating_sub(printed_diagnostics)), + max_diagnostics.saturating_sub(printed_diagnostics), Ordering::Relaxed, ); } else { @@ -410,18 +455,14 @@ fn process_messages(options: ProcessMessagesOptions) -> bool { }); } } else { - sender_reports - .send(ReportKind::Error( - name.to_string(), - ReportErrorKind::Diagnostic(ReportDiagnostic { - code: diag - .category() - .and_then(|code| code.name().parse().ok()), - title: String::from("test here"), - severity, - }), - )) - .ok(); + report.push_detail_report(ReportKind::Error( + name.to_string(), + ReportErrorKind::Diagnostic(ReportDiagnostic { + code: diag.category().and_then(|code| code.name().parse().ok()), + title: String::from("test here"), + severity, + }), + )); } } } @@ -434,7 +475,7 @@ fn process_messages(options: ProcessMessagesOptions) -> bool { } => { if mode.is_ci() { // A diff is an error in CI mode - has_errors = true; + *has_errors = true; } if mode.should_report_to_terminal() { @@ -464,16 +505,14 @@ fn process_messages(options: ProcessMessagesOptions) -> bool { }); } } else { - sender_reports - .send(ReportKind::Error( - file_name, - ReportErrorKind::Diff(ReportDiff { - before: old, - after: new, - severity: Severity::Error, - }), - )) - .ok(); + report.push_detail_report(ReportKind::Error( + file_name, + ReportErrorKind::Diff(ReportDiff { + before: old, + after: new, + severity: Severity::Error, + }), + )); } } } @@ -492,17 +531,6 @@ fn process_messages(options: ProcessMessagesOptions) -> bool { "Diagnostics not shown: "{not_printed_diagnostics}"." }) } - - has_errors -} - -fn collect_reports(receiver: Receiver) -> Report { - let mut report = Report::default(); - while let Ok(stat) = receiver.recv() { - report.push_detail_report(stat); - } - - report } /// Context object shared between directory traversal tasks @@ -512,7 +540,7 @@ struct TraversalOptions<'ctx, 'app> { /// Instance of [Workspace] used by this instance of the CLI workspace: &'ctx dyn Workspace, /// Determines how the files should be processed - execution: Execution, + execution: &'ctx Execution, /// File paths interner used by the filesystem traversal interner: AtomicInterner, /// Shared atomic counter storing the number of processed files @@ -525,7 +553,7 @@ struct TraversalOptions<'ctx, 'app> { sender_reports: Sender, /// The approximate number of diagnostics the console will print before /// folding the rest into the "skipped diagnostics" counter - remaining_diagnostics: Arc, + remaining_diagnostics: &'ctx AtomicU16, } impl<'ctx, 'app> TraversalOptions<'ctx, 'app> { @@ -737,7 +765,7 @@ fn process_file(ctx: &TraversalOptions, path: &Path, file_id: FileId) -> FileRes let max_diagnostics = ctx.remaining_diagnostics.load(Ordering::Relaxed); let result = file_guard - .pull_diagnostics(categories, max_diagnostics) + .pull_diagnostics(categories, max_diagnostics.into()) .with_file_id_and_code(file_id, category!("lint"))?; // In formatting mode, abort immediately if the file has errors