From d3e5afdd371c0dfc399c1fdbcb24898f85661ff8 Mon Sep 17 00:00:00 2001 From: Arcticae Date: Wed, 11 Dec 2024 09:56:56 +0100 Subject: [PATCH 1/4] Track diagnostics worker threads with beacons --- src/lang/diagnostics/mod.rs | 62 +++++++++++++++++++++++++++++++++++-- src/state.rs | 53 +++++++++++++++++++++++++++---- 2 files changed, 106 insertions(+), 9 deletions(-) diff --git a/src/lang/diagnostics/mod.rs b/src/lang/diagnostics/mod.rs index a57c75d..9d03072 100644 --- a/src/lang/diagnostics/mod.rs +++ b/src/lang/diagnostics/mod.rs @@ -2,9 +2,11 @@ use std::collections::HashSet; use std::iter; use std::iter::zip; use std::num::NonZero; -use std::panic::{AssertUnwindSafe, catch_unwind}; +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::sync::{Arc, Mutex}; use cairo_lang_filesystem::ids::FileId; +use lsp_types::notification::Notification; use lsp_types::Url; use tracing::{error, trace}; @@ -37,25 +39,79 @@ pub struct DiagnosticsController { trigger: trigger::Sender, _thread: JoinHandle, state_snapshots_props: StateSnapshotsProps, + active_snapshots: Arc>>, + notifier: Notifier, } impl DiagnosticsController { /// Creates a new diagnostics controller. pub fn new(notifier: Notifier) -> Self { let (trigger, receiver) = trigger(); - let (thread, parallelism) = DiagnosticsControllerThread::spawn(receiver, notifier); + let (thread, parallelism) = DiagnosticsControllerThread::spawn(receiver, notifier.clone()); Self { trigger, _thread: thread, state_snapshots_props: StateSnapshotsProps { parallelism }, + active_snapshots: Arc::new(Mutex::new(HashSet::default())), + notifier, } } /// Schedules diagnostics refreshing on snapshot(s) of the current state. pub fn refresh(&self, state: &State) { - let state_snapshots = StateSnapshots::new(state, &self.state_snapshots_props); + let mut state_snapshots = StateSnapshots::new(state, &self.state_snapshots_props); + self.register_beacons(&mut state_snapshots); + + DiagnosticsController::notify_start_analysis(self.notifier.clone()); self.trigger.activate(state_snapshots); } + + fn register_beacons(&self, state_snapshots: &mut StateSnapshots) { + let active_snapshots_ref = self.active_snapshots.clone(); + (active_snapshots_ref.lock().unwrap()).clear(); + + state_snapshots.0.iter_mut().enumerate().for_each(|(i, beacon)| { + let mut active_snapshots = active_snapshots_ref.lock().unwrap(); + active_snapshots.insert(i); + + let active_snapshots_ref_2 = self.active_snapshots.clone(); + let notifer_ref = self.notifier.clone(); + beacon.on_drop(move || { + let mut active_snapshots = active_snapshots_ref_2.lock().unwrap(); + + active_snapshots.remove(&i); + if active_snapshots.is_empty() { + DiagnosticsController::notify_stop_analysis(notifer_ref); + } + }); + }); + } + + fn notify_stop_analysis(notifier: Notifier) { + notifier.notify::(()); + } + + fn notify_start_analysis(notifier: Notifier) { + notifier.notify::(()); + } +} + +/// Notifies about diagnostics round which is beginning to calculate +#[derive(Debug)] +pub struct DiagnosticsCalculationStart; + +impl Notification for DiagnosticsCalculationStart { + type Params = (); + const METHOD: &'static str = "cairo/diagnosticsCalculationStart"; +} + +/// Notifies about diagnostics round which ended calulating +#[derive(Debug)] +pub struct DiagnosticsCalculationFinish; + +impl Notification for DiagnosticsCalculationFinish { + type Params = (); + const METHOD: &'static str = "cairo/diagnosticsCalculationFinish"; } /// Stores entire state of diagnostics controller's worker thread. diff --git a/src/state.rs b/src/state.rs index 72e7525..2dfbadd 100644 --- a/src/state.rs +++ b/src/state.rs @@ -2,10 +2,6 @@ use std::collections::HashSet; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use lsp_types::{ClientCapabilities, Url}; -use salsa::ParallelDatabase; - -use crate::Tricks; use crate::config::Config; use crate::lang::db::{AnalysisDatabase, AnalysisDatabaseSwapper}; use crate::lang::diagnostics::DiagnosticsController; @@ -14,6 +10,9 @@ use crate::project::ProjectController; use crate::server::client::Client; use crate::server::connection::ClientSender; use crate::toolchain::scarb::ScarbToolchain; +use crate::Tricks; +use lsp_types::{ClientCapabilities, Url}; +use salsa::ParallelDatabase; /// State of Language server. pub struct State { @@ -55,16 +54,58 @@ impl State { } pub fn snapshot(&self) -> StateSnapshot { - StateSnapshot { + Beacon::wrap(SnapshotInternal { db: self.db.snapshot(), open_files: self.open_files.snapshot(), config: self.config.snapshot(), + }) + } +} + +pub struct Beacon { + value: T, + drop_hook: Option () + Send>>, +} + +impl Beacon +where + T: Send, +{ + // Constructor to wrap a value + pub fn wrap(value: T) -> Self { + Self { value, drop_hook: None } + } + + // Set the drop hook + pub fn on_drop(&mut self, drop_hook: F) + where + F: FnOnce() + Send + Sync + 'static, + { + self.drop_hook = Some(Box::new(drop_hook)); + } +} + +impl Drop for Beacon { + fn drop(&mut self) { + // take the hook, replacing with None + if let Some(hook) = self.drop_hook.take() { + hook(); // call the hook } } } +impl Deref for Beacon { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + +pub type StateSnapshot = Beacon; + /// Readonly snapshot of Language server state. -pub struct StateSnapshot { +pub struct SnapshotInternal { pub db: salsa::Snapshot, pub open_files: Snapshot>, pub config: Snapshot, From 2a2bc0482604578692c97e3a21a2f91d2e4bbd90 Mon Sep 17 00:00:00 2001 From: Arcticae Date: Wed, 18 Dec 2024 12:56:07 +0100 Subject: [PATCH 2/4] Analysis Progress Controller --- src/config.rs | 3 + .../proc_macros/client => }/id_generator.rs | 0 src/ide/analysis_progress.rs | 154 ++++++++++++++++++ src/ide/mod.rs | 1 + src/lang/diagnostics/mod.rs | 86 +++------- src/lang/proc_macros/client/mod.rs | 13 +- src/lang/proc_macros/controller.rs | 11 +- src/lib.rs | 1 + src/server/client.rs | 2 +- src/state.rs | 39 +++-- 10 files changed, 226 insertions(+), 84 deletions(-) rename src/{lang/proc_macros/client => }/id_generator.rs (100%) create mode 100644 src/ide/analysis_progress.rs diff --git a/src/config.rs b/src/config.rs index 96d7def..dddaeb4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -97,6 +97,9 @@ impl Config { response.pop_front().as_ref().and_then(Value::as_bool).unwrap_or_default(); state.config.enable_proc_macros = response.pop_front().as_ref().and_then(Value::as_bool).unwrap_or(false); + state + .analysis_progress_controller + .set_procmacros_enabled(state.config.enable_proc_macros); debug!("reloaded configuration: {:#?}", state.config); diff --git a/src/lang/proc_macros/client/id_generator.rs b/src/id_generator.rs similarity index 100% rename from src/lang/proc_macros/client/id_generator.rs rename to src/id_generator.rs diff --git a/src/ide/analysis_progress.rs b/src/ide/analysis_progress.rs new file mode 100644 index 0000000..42b4ee0 --- /dev/null +++ b/src/ide/analysis_progress.rs @@ -0,0 +1,154 @@ +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; + +use lsp_types::notification::Notification; + +use crate::id_generator::IdGenerator; +use crate::server::client::Notifier; +use crate::state::Beacon; + +/// Controller used to send notifications to the client about analysis progress. +/// Uses information provided from other controllers (diagnostics controller, procmacro controller) +/// to assess if diagnostics are in fact calculated. +#[derive(Debug, Clone)] +pub struct AnalysisProgressController { + notifier: Notifier, + /// ID of the diagnostics "generation" - the scheduled diagnostics jobs set. + /// Used to filter out stale threads finishing when new ones (from newer "generation") + /// are already in progress and being tracked by the controller. + generation_id: Arc>, + /// Sequential IDs of state snapshots from the current generation, used to track their status + /// (present meaning it's still being used) + active_snapshots: Arc>>, + id_generator: Arc, + /// If `true` - a request to procmacro server was submitted, meaning that analysis will extend + /// beyond the current generation of diagnostics. + did_submit_procmacro_request: Arc>, + /// Indicates that a notification was sent and analysis (i.e. macro expansion) is taking place. + analysis_in_progress: Arc>, + /// Loaded asynchronously from config - unset if config was not loaded yet. + /// Has to be set in order for analysis to finish. + procmacros_enabled: Arc>>, +} + +impl AnalysisProgressController { + pub fn new(notifier: Notifier) -> Self { + let id_generator = Arc::new(IdGenerator::default()); + Self { + notifier, + id_generator: id_generator.clone(), + active_snapshots: Arc::new(Mutex::new(HashSet::default())), + did_submit_procmacro_request: Arc::new(Mutex::new(true)), + analysis_in_progress: Arc::new(Mutex::new(false)), + procmacros_enabled: Arc::new(Mutex::new(None)), + generation_id: Arc::new(Mutex::new(id_generator.unique_id())), + } + } + + /// Signals that a request to proc macro server was made during the current generation of + /// diagnostics. + pub fn register_procmacro_request(&self) { + let mut write_guard = self.did_submit_procmacro_request.lock().unwrap(); + *write_guard = true; + } + + /// Allows to set the procmacro configuration to whatever is in the config, upon loading it. + pub fn set_procmacros_enabled(&self, value: bool) { + let mut guard = self.procmacros_enabled.lock().unwrap(); + *guard = Some(value); + } + + /// Sets handlers for tracking beacons sent to threads. + /// The beacons are wrapping snapshots, which are signalling when diagnostics finished + /// calculating for a given snapshot (used for calculating files diagnostics or removing + /// stale ones) + pub fn track_analysis(&self, beacons: &mut [Beacon]) { + let gen_id = self.next_generation_id(); + + self.clear_active_snapshots(); + beacons.iter_mut().enumerate().for_each(|(i, beacon)| { + self.insert_active_snapshot(i); + + let self_ref: AnalysisProgressController = self.clone(); + beacon.on_signal(move || { + let current_gen = self_ref.get_generation_id(); + if current_gen == gen_id { + self_ref.remove_active_snapshot(i); + self_ref.try_stop_analysis(); + } + }); + }); + + self.start_analysis(); + } + + fn insert_active_snapshot(&self, snapshot_id: usize) { + let mut active_snapshots = self.active_snapshots.lock().unwrap(); + active_snapshots.insert(snapshot_id); + } + + fn next_generation_id(&self) -> u64 { + let mut generation_id_guard = self.generation_id.lock().unwrap(); + *generation_id_guard = self.id_generator.unique_id(); + *generation_id_guard + } + + fn get_generation_id(&self) -> u64 { + *self.generation_id.lock().unwrap() + } + + fn remove_active_snapshot(&self, snapshot_id: usize) { + let mut active_snapshots = self.active_snapshots.lock().unwrap(); + active_snapshots.remove(&snapshot_id); + } + + fn clear_active_snapshots(&self) { + let active_snapshots_ref = self.active_snapshots.clone(); + active_snapshots_ref.lock().unwrap().clear(); + } + + /// Starts a next generation of diagnostics, sends a notification + fn start_analysis(&self) { + let mut analysis_in_progress = self.analysis_in_progress.lock().unwrap(); + if !(*analysis_in_progress) { + *analysis_in_progress = true; + self.notifier.notify::(()); + } + } + + /// Checks a bunch of conditions and if they are fulfilled, sends stop notification + /// and resets the state back to start of generation defaults. + fn try_stop_analysis(&self) { + let mut did_submit_procmacro_request = self.did_submit_procmacro_request.lock().unwrap(); + let snapshots_empty = self.active_snapshots.lock().unwrap().is_empty(); + let mut analysis_in_progress = self.analysis_in_progress.lock().unwrap(); + let procmacros_enabled = *self.procmacros_enabled.lock().unwrap(); + + if snapshots_empty + && (!*did_submit_procmacro_request || (procmacros_enabled == Some(false))) + && *analysis_in_progress + { + *analysis_in_progress = false; + *did_submit_procmacro_request = false; + self.notifier.notify::(()); + } + } +} + +/// Notifies about diagnostics generation which is beginning to calculate +#[derive(Debug)] +pub struct DiagnosticsCalculationStart; + +impl Notification for DiagnosticsCalculationStart { + type Params = (); + const METHOD: &'static str = "cairo/diagnosticsCalculationStart"; +} + +/// Notifies about diagnostics generation which ended calculating +#[derive(Debug)] +pub struct DiagnosticsCalculationFinish; + +impl Notification for DiagnosticsCalculationFinish { + type Params = (); + const METHOD: &'static str = "cairo/diagnosticsCalculationFinish"; +} diff --git a/src/ide/mod.rs b/src/ide/mod.rs index 5d22217..245ac81 100644 --- a/src/ide/mod.rs +++ b/src/ide/mod.rs @@ -1,3 +1,4 @@ +pub mod analysis_progress; pub mod code_actions; pub mod completion; pub mod formatter; diff --git a/src/lang/diagnostics/mod.rs b/src/lang/diagnostics/mod.rs index 9d03072..b59f43d 100644 --- a/src/lang/diagnostics/mod.rs +++ b/src/lang/diagnostics/mod.rs @@ -2,17 +2,16 @@ use std::collections::HashSet; use std::iter; use std::iter::zip; use std::num::NonZero; -use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::sync::{Arc, Mutex}; +use std::panic::{AssertUnwindSafe, catch_unwind}; use cairo_lang_filesystem::ids::FileId; -use lsp_types::notification::Notification; use lsp_types::Url; use tracing::{error, trace}; use self::project_diagnostics::ProjectDiagnostics; use self::refresh::{clear_old_diagnostics, refresh_diagnostics}; use self::trigger::trigger; +use crate::ide::analysis_progress::AnalysisProgressController; use crate::lang::diagnostics::file_batches::{batches, find_primary_files, find_secondary_files}; use crate::lang::lsp::LsProtoGroup; use crate::server::client::Notifier; @@ -39,87 +38,41 @@ pub struct DiagnosticsController { trigger: trigger::Sender, _thread: JoinHandle, state_snapshots_props: StateSnapshotsProps, - active_snapshots: Arc>>, - notifier: Notifier, } impl DiagnosticsController { /// Creates a new diagnostics controller. - pub fn new(notifier: Notifier) -> Self { + pub fn new( + notifier: Notifier, + analysis_progress_controller: AnalysisProgressController, + ) -> Self { let (trigger, receiver) = trigger(); - let (thread, parallelism) = DiagnosticsControllerThread::spawn(receiver, notifier.clone()); + let (thread, parallelism) = DiagnosticsControllerThread::spawn( + receiver, + notifier.clone(), + analysis_progress_controller, + ); + Self { trigger, _thread: thread, state_snapshots_props: StateSnapshotsProps { parallelism }, - active_snapshots: Arc::new(Mutex::new(HashSet::default())), - notifier, } } /// Schedules diagnostics refreshing on snapshot(s) of the current state. pub fn refresh(&self, state: &State) { - let mut state_snapshots = StateSnapshots::new(state, &self.state_snapshots_props); - self.register_beacons(&mut state_snapshots); - - DiagnosticsController::notify_start_analysis(self.notifier.clone()); - self.trigger.activate(state_snapshots); - } - - fn register_beacons(&self, state_snapshots: &mut StateSnapshots) { - let active_snapshots_ref = self.active_snapshots.clone(); - (active_snapshots_ref.lock().unwrap()).clear(); - - state_snapshots.0.iter_mut().enumerate().for_each(|(i, beacon)| { - let mut active_snapshots = active_snapshots_ref.lock().unwrap(); - active_snapshots.insert(i); - - let active_snapshots_ref_2 = self.active_snapshots.clone(); - let notifer_ref = self.notifier.clone(); - beacon.on_drop(move || { - let mut active_snapshots = active_snapshots_ref_2.lock().unwrap(); - - active_snapshots.remove(&i); - if active_snapshots.is_empty() { - DiagnosticsController::notify_stop_analysis(notifer_ref); - } - }); - }); - } - - fn notify_stop_analysis(notifier: Notifier) { - notifier.notify::(()); - } - - fn notify_start_analysis(notifier: Notifier) { - notifier.notify::(()); + self.trigger.activate(StateSnapshots::new(state, &self.state_snapshots_props)); } } -/// Notifies about diagnostics round which is beginning to calculate -#[derive(Debug)] -pub struct DiagnosticsCalculationStart; - -impl Notification for DiagnosticsCalculationStart { - type Params = (); - const METHOD: &'static str = "cairo/diagnosticsCalculationStart"; -} - -/// Notifies about diagnostics round which ended calulating -#[derive(Debug)] -pub struct DiagnosticsCalculationFinish; - -impl Notification for DiagnosticsCalculationFinish { - type Params = (); - const METHOD: &'static str = "cairo/diagnosticsCalculationFinish"; -} - /// Stores entire state of diagnostics controller's worker thread. struct DiagnosticsControllerThread { receiver: trigger::Receiver, notifier: Notifier, pool: thread::Pool, project_diagnostics: ProjectDiagnostics, + analysis_progress_controller: AnalysisProgressController, } impl DiagnosticsControllerThread { @@ -128,10 +81,12 @@ impl DiagnosticsControllerThread { fn spawn( receiver: trigger::Receiver, notifier: Notifier, + analysis_progress_controller: AnalysisProgressController, ) -> (JoinHandle, NonZero) { let this = Self { receiver, notifier, + analysis_progress_controller, pool: thread::Pool::new(), project_diagnostics: ProjectDiagnostics::new(), }; @@ -148,7 +103,8 @@ impl DiagnosticsControllerThread { /// Runs diagnostics controller's event loop. fn event_loop(&self) { - while let Some(state_snapshots) = self.receiver.wait() { + while let Some(mut state_snapshots) = self.receiver.wait() { + self.analysis_progress_controller.track_analysis(&mut state_snapshots.0); if let Err(err) = catch_unwind(AssertUnwindSafe(|| { self.diagnostics_controller_tick(state_snapshots); })) { @@ -165,7 +121,7 @@ impl DiagnosticsControllerThread { /// Runs a single tick of the diagnostics controller's event loop. #[tracing::instrument(skip_all)] fn diagnostics_controller_tick(&self, state_snapshots: StateSnapshots) { - let (state, primary_snapshots, secondary_snapshots) = state_snapshots.split(); + let (mut state, primary_snapshots, secondary_snapshots) = state_snapshots.split(); let primary_set = find_primary_files(&state.db, &state.open_files); let primary: Vec<_> = primary_set.iter().copied().collect(); @@ -182,6 +138,7 @@ impl DiagnosticsControllerThread { self.spawn_worker(move |project_diagnostics, notifier| { clear_old_diagnostics(files_to_preserve, project_diagnostics, notifier); + state.signal(); }); } @@ -206,7 +163,7 @@ impl DiagnosticsControllerThread { fn spawn_refresh_worker(&self, files: &[FileId], state_snapshots: Vec) { let files_batches = batches(files, self.pool.parallelism()); assert_eq!(files_batches.len(), state_snapshots.len()); - for (batch, state) in zip(files_batches, state_snapshots) { + for (batch, mut state) in zip(files_batches, state_snapshots) { self.spawn_worker(move |project_diagnostics, notifier| { refresh_diagnostics( &state.db, @@ -215,6 +172,7 @@ impl DiagnosticsControllerThread { project_diagnostics, notifier, ); + state.signal(); }); } } diff --git a/src/lang/proc_macros/client/mod.rs b/src/lang/proc_macros/client/mod.rs index 7a0cdb9..0f0ebdd 100644 --- a/src/lang/proc_macros/client/mod.rs +++ b/src/lang/proc_macros/client/mod.rs @@ -16,8 +16,10 @@ use scarb_proc_macro_server_types::methods::expand::{ pub use status::ClientStatus; use tracing::error; +use crate::id_generator; +use crate::ide::analysis_progress::AnalysisProgressController; + pub mod connection; -mod id_generator; pub mod status; #[derive(Debug)] @@ -33,15 +35,21 @@ pub struct ProcMacroClient { id_generator: id_generator::IdGenerator, requests_params: Mutex>, error_channel: Sender<()>, + analysis_progress_controller: AnalysisProgressController, } impl ProcMacroClient { - pub fn new(connection: ProcMacroServerConnection, error_channel: Sender<()>) -> Self { + pub fn new( + connection: ProcMacroServerConnection, + error_channel: Sender<()>, + analysis_progress_controller: AnalysisProgressController, + ) -> Self { Self { connection, id_generator: Default::default(), requests_params: Default::default(), error_channel, + analysis_progress_controller, } } @@ -142,6 +150,7 @@ impl ProcMacroClient { match self.send_request_untracked::(id, ¶ms) { Ok(()) => { requests_params.insert(id, map(params)); + self.analysis_progress_controller.register_procmacro_request(); } Err(err) => { error!("Sending request to proc-macro-server failed: {err:?}"); diff --git a/src/lang/proc_macros/controller.rs b/src/lang/proc_macros/controller.rs index 15e88b0..7c83d54 100644 --- a/src/lang/proc_macros/controller.rs +++ b/src/lang/proc_macros/controller.rs @@ -19,6 +19,7 @@ use super::client::connection::ProcMacroServerConnection; use super::client::status::ClientStatus; use super::client::{ProcMacroClient, RequestParams}; use crate::config::Config; +use crate::ide::analysis_progress::AnalysisProgressController; use crate::lang::db::AnalysisDatabase; use crate::lang::proc_macros::db::ProcMacroGroup; use crate::lang::proc_macros::plugins::proc_macro_plugin_suite; @@ -53,6 +54,7 @@ pub struct ProcMacroClientController { plugin_suite: Option, initialization_retries: RateLimiter, channels: ProcMacroChannels, + analysis_progress_controller: AnalysisProgressController, } impl ProcMacroClientController { @@ -60,10 +62,15 @@ impl ProcMacroClientController { self.channels.clone() } - pub fn new(scarb: ScarbToolchain, notifier: Notifier) -> Self { + pub fn new( + scarb: ScarbToolchain, + notifier: Notifier, + analysis_progress_controller: AnalysisProgressController, + ) -> Self { Self { scarb, notifier, + analysis_progress_controller, plugin_suite: Default::default(), initialization_retries: RateLimiter::direct( Quota::with_period(Duration::from_secs( @@ -170,10 +177,10 @@ impl ProcMacroClientController { self.channels.response_sender.clone(), ), self.channels.error_sender.clone(), + self.analysis_progress_controller.clone(), ); client.start_initialize(); - db.set_proc_macro_client_status(ClientStatus::Starting(Arc::new(client))); } Err(err) => { diff --git a/src/lib.rs b/src/lib.rs index 4ae8b49..b74b377 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,6 +71,7 @@ use crate::state::State; mod config; mod env_config; +mod id_generator; mod ide; mod lang; pub mod lsp; diff --git a/src/server/client.rs b/src/server/client.rs index c1637d9..e58292a 100644 --- a/src/server/client.rs +++ b/src/server/client.rs @@ -26,7 +26,7 @@ pub struct Client<'s> { pub(super) requester: Requester<'s>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Notifier(ClientSender); #[derive(Clone)] diff --git a/src/state.rs b/src/state.rs index 2dfbadd..ee709be 100644 --- a/src/state.rs +++ b/src/state.rs @@ -2,7 +2,12 @@ use std::collections::HashSet; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use lsp_types::{ClientCapabilities, Url}; +use salsa::ParallelDatabase; + +use crate::Tricks; use crate::config::Config; +use crate::ide::analysis_progress::AnalysisProgressController; use crate::lang::db::{AnalysisDatabase, AnalysisDatabaseSwapper}; use crate::lang::diagnostics::DiagnosticsController; use crate::lang::proc_macros::controller::ProcMacroClientController; @@ -10,9 +15,6 @@ use crate::project::ProjectController; use crate::server::client::Client; use crate::server::connection::ClientSender; use crate::toolchain::scarb::ScarbToolchain; -use crate::Tricks; -use lsp_types::{ClientCapabilities, Url}; -use salsa::ParallelDatabase; /// State of Language server. pub struct State { @@ -26,6 +28,7 @@ pub struct State { pub diagnostics_controller: DiagnosticsController, pub proc_macro_controller: ProcMacroClientController, pub project_controller: ProjectController, + pub analysis_progress_controller: AnalysisProgressController, } impl State { @@ -36,8 +39,16 @@ impl State { ) -> Self { let notifier = Client::new(sender).notifier(); let scarb_toolchain = ScarbToolchain::new(notifier.clone()); - let proc_macro_controller = - ProcMacroClientController::new(scarb_toolchain.clone(), notifier.clone()); + + let analysis_progress_controller = AnalysisProgressController::new(notifier.clone()); + let proc_macro_controller = ProcMacroClientController::new( + scarb_toolchain.clone(), + notifier.clone(), + analysis_progress_controller.clone(), + ); + + let diagnostics_controller = + DiagnosticsController::new(notifier.clone(), analysis_progress_controller.clone()); Self { db: AnalysisDatabase::new(&tricks), @@ -47,7 +58,8 @@ impl State { scarb_toolchain: scarb_toolchain.clone(), db_swapper: AnalysisDatabaseSwapper::new(), tricks: Owned::new(tricks.into()), - diagnostics_controller: DiagnosticsController::new(notifier.clone()), + diagnostics_controller, + analysis_progress_controller, proc_macro_controller, project_controller: ProjectController::initialize(scarb_toolchain, notifier), } @@ -64,7 +76,7 @@ impl State { pub struct Beacon { value: T, - drop_hook: Option () + Send>>, + signal_hook: Option>, } impl Beacon @@ -73,22 +85,19 @@ where { // Constructor to wrap a value pub fn wrap(value: T) -> Self { - Self { value, drop_hook: None } + Self { value, signal_hook: None } } // Set the drop hook - pub fn on_drop(&mut self, drop_hook: F) + pub fn on_signal(&mut self, drop_hook: F) where F: FnOnce() + Send + Sync + 'static, { - self.drop_hook = Some(Box::new(drop_hook)); + self.signal_hook = Some(Box::new(drop_hook)); } -} -impl Drop for Beacon { - fn drop(&mut self) { - // take the hook, replacing with None - if let Some(hook) = self.drop_hook.take() { + pub fn signal(&mut self) { + if let Some(hook) = self.signal_hook.take() { hook(); // call the hook } } From 16060995127bdc88a168ca8a2bad07e11aeb451a Mon Sep 17 00:00:00 2001 From: Tomasz Rejowski <34059895+Arcticae@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:00:45 +0100 Subject: [PATCH 3/4] Update src/lang/diagnostics/mod.rs Co-authored-by: Piotr Figiela <77412592+Draggu@users.noreply.github.com> Signed-off-by: Tomasz Rejowski <34059895+Arcticae@users.noreply.github.com> --- src/lang/diagnostics/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lang/diagnostics/mod.rs b/src/lang/diagnostics/mod.rs index b59f43d..b8f692e 100644 --- a/src/lang/diagnostics/mod.rs +++ b/src/lang/diagnostics/mod.rs @@ -49,7 +49,7 @@ impl DiagnosticsController { let (trigger, receiver) = trigger(); let (thread, parallelism) = DiagnosticsControllerThread::spawn( receiver, - notifier.clone(), + notifier, analysis_progress_controller, ); From 935e0ad2f3dcc262a42d7ccca06db748885f7b7e Mon Sep 17 00:00:00 2001 From: Arcticae Date: Thu, 19 Dec 2024 18:02:23 +0100 Subject: [PATCH 4/4] Review suggestions --- src/ide/analysis_progress.rs | 132 +++++++++++++++++------------ src/lang/diagnostics/mod.rs | 32 ++++--- src/lang/proc_macros/client/mod.rs | 23 +++-- src/lang/proc_macros/controller.rs | 10 +-- src/state.rs | 47 +++++----- 5 files changed, 136 insertions(+), 108 deletions(-) diff --git a/src/ide/analysis_progress.rs b/src/ide/analysis_progress.rs index 42b4ee0..64d8db4 100644 --- a/src/ide/analysis_progress.rs +++ b/src/ide/analysis_progress.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use lsp_types::notification::Notification; @@ -7,6 +8,40 @@ use crate::id_generator::IdGenerator; use crate::server::client::Notifier; use crate::state::Beacon; +/// A facade for `AnalysisProgressController` that allows to track progress of diagnostics +/// generation and procmacro requests. +#[derive(Clone)] +pub struct AnalysisProgressTracker { + controller: AnalysisProgressController, +} + +impl AnalysisProgressTracker { + /// Signals that a request to proc macro server was made during the current generation of + /// diagnostics. + pub fn register_procmacro_request(&self) { + self.controller.set_did_submit_procmacro_request(true); + } + + /// Sets handlers for tracking beacons sent to threads. + /// The beacons are wrapping snapshots, which are signalling when diagnostics finished + /// calculating for a given snapshot (used for calculating files diagnostics or removing + /// stale ones) + pub fn track_analysis<'a>(&self, beacons: impl Iterator) { + let gen_id = self.controller.next_generation_id(); + + self.controller.clear_active_snapshots(); + + beacons.enumerate().for_each(|(i, beacon)| { + self.controller.insert_active_snapshot(i); + + let controller_ref: AnalysisProgressController = self.controller.clone(); + beacon.on_signal(move || controller_ref.on_snapshot_deactivate(gen_id, i)); + }); + + self.controller.start_analysis(); + } +} + /// Controller used to send notifications to the client about analysis progress. /// Uses information provided from other controllers (diagnostics controller, procmacro controller) /// to assess if diagnostics are in fact calculated. @@ -16,40 +51,41 @@ pub struct AnalysisProgressController { /// ID of the diagnostics "generation" - the scheduled diagnostics jobs set. /// Used to filter out stale threads finishing when new ones (from newer "generation") /// are already in progress and being tracked by the controller. - generation_id: Arc>, + generation_id: Arc, /// Sequential IDs of state snapshots from the current generation, used to track their status /// (present meaning it's still being used) active_snapshots: Arc>>, id_generator: Arc, /// If `true` - a request to procmacro server was submitted, meaning that analysis will extend /// beyond the current generation of diagnostics. - did_submit_procmacro_request: Arc>, + did_submit_procmacro_request: Arc, /// Indicates that a notification was sent and analysis (i.e. macro expansion) is taking place. - analysis_in_progress: Arc>, + analysis_in_progress: Arc, /// Loaded asynchronously from config - unset if config was not loaded yet. /// Has to be set in order for analysis to finish. procmacros_enabled: Arc>>, } impl AnalysisProgressController { + pub fn tracker(&self) -> AnalysisProgressTracker { + AnalysisProgressTracker { controller: self.clone() } + } + pub fn new(notifier: Notifier) -> Self { let id_generator = Arc::new(IdGenerator::default()); Self { notifier, id_generator: id_generator.clone(), active_snapshots: Arc::new(Mutex::new(HashSet::default())), - did_submit_procmacro_request: Arc::new(Mutex::new(true)), - analysis_in_progress: Arc::new(Mutex::new(false)), + did_submit_procmacro_request: Arc::new(AtomicBool::new(false)), + analysis_in_progress: Arc::new(AtomicBool::new(false)), procmacros_enabled: Arc::new(Mutex::new(None)), - generation_id: Arc::new(Mutex::new(id_generator.unique_id())), + generation_id: Arc::new(AtomicU64::new(id_generator.unique_id())), } } - /// Signals that a request to proc macro server was made during the current generation of - /// diagnostics. - pub fn register_procmacro_request(&self) { - let mut write_guard = self.did_submit_procmacro_request.lock().unwrap(); - *write_guard = true; + pub fn set_did_submit_procmacro_request(&self, value: bool) { + self.did_submit_procmacro_request.store(value, Ordering::SeqCst); } /// Allows to set the procmacro configuration to whatever is in the config, upon loading it. @@ -58,60 +94,49 @@ impl AnalysisProgressController { *guard = Some(value); } - /// Sets handlers for tracking beacons sent to threads. - /// The beacons are wrapping snapshots, which are signalling when diagnostics finished - /// calculating for a given snapshot (used for calculating files diagnostics or removing - /// stale ones) - pub fn track_analysis(&self, beacons: &mut [Beacon]) { - let gen_id = self.next_generation_id(); - - self.clear_active_snapshots(); - beacons.iter_mut().enumerate().for_each(|(i, beacon)| { - self.insert_active_snapshot(i); - - let self_ref: AnalysisProgressController = self.clone(); - beacon.on_signal(move || { - let current_gen = self_ref.get_generation_id(); - if current_gen == gen_id { - self_ref.remove_active_snapshot(i); - self_ref.try_stop_analysis(); - } - }); - }); - - self.start_analysis(); - } - - fn insert_active_snapshot(&self, snapshot_id: usize) { + pub fn insert_active_snapshot(&self, snapshot_id: usize) { let mut active_snapshots = self.active_snapshots.lock().unwrap(); active_snapshots.insert(snapshot_id); } - fn next_generation_id(&self) -> u64 { - let mut generation_id_guard = self.generation_id.lock().unwrap(); - *generation_id_guard = self.id_generator.unique_id(); - *generation_id_guard + pub fn on_snapshot_deactivate(&self, snapshot_gen_id: u64, snapshot_id: usize) { + let current_gen = self.get_generation_id(); + if current_gen == snapshot_gen_id { + self.remove_active_snapshot(snapshot_id); + self.try_stop_analysis(); + } + } + + pub fn next_generation_id(&self) -> u64 { + let new_gen_id = self.id_generator.unique_id(); + self.generation_id.store(new_gen_id, Ordering::SeqCst); + new_gen_id } - fn get_generation_id(&self) -> u64 { - *self.generation_id.lock().unwrap() + pub fn get_generation_id(&self) -> u64 { + self.generation_id.load(Ordering::SeqCst) } - fn remove_active_snapshot(&self, snapshot_id: usize) { + pub fn remove_active_snapshot(&self, snapshot_id: usize) { let mut active_snapshots = self.active_snapshots.lock().unwrap(); active_snapshots.remove(&snapshot_id); } - fn clear_active_snapshots(&self) { + pub fn clear_active_snapshots(&self) { let active_snapshots_ref = self.active_snapshots.clone(); active_snapshots_ref.lock().unwrap().clear(); } /// Starts a next generation of diagnostics, sends a notification fn start_analysis(&self) { - let mut analysis_in_progress = self.analysis_in_progress.lock().unwrap(); - if !(*analysis_in_progress) { - *analysis_in_progress = true; + let analysis_in_progress = self.analysis_in_progress.load(Ordering::SeqCst); + let config_loaded = self.procmacros_enabled.lock().unwrap().is_some(); + // We want to clear this flag always when starting a new generation to track the requests + // properly + self.did_submit_procmacro_request.store(false, Ordering::SeqCst); + + if !analysis_in_progress && config_loaded { + self.analysis_in_progress.store(true, Ordering::SeqCst); self.notifier.notify::(()); } } @@ -119,17 +144,18 @@ impl AnalysisProgressController { /// Checks a bunch of conditions and if they are fulfilled, sends stop notification /// and resets the state back to start of generation defaults. fn try_stop_analysis(&self) { - let mut did_submit_procmacro_request = self.did_submit_procmacro_request.lock().unwrap(); + let did_submit_procmacro_request = self.did_submit_procmacro_request.load(Ordering::SeqCst); let snapshots_empty = self.active_snapshots.lock().unwrap().is_empty(); - let mut analysis_in_progress = self.analysis_in_progress.lock().unwrap(); + let analysis_in_progress = self.analysis_in_progress.load(Ordering::SeqCst); let procmacros_enabled = *self.procmacros_enabled.lock().unwrap(); if snapshots_empty - && (!*did_submit_procmacro_request || (procmacros_enabled == Some(false))) - && *analysis_in_progress + && (!did_submit_procmacro_request || (procmacros_enabled == Some(false))) + && analysis_in_progress { - *analysis_in_progress = false; - *did_submit_procmacro_request = false; + self.did_submit_procmacro_request.store(false, Ordering::SeqCst); + self.analysis_in_progress.store(false, Ordering::SeqCst); + self.notifier.notify::(()); } } diff --git a/src/lang/diagnostics/mod.rs b/src/lang/diagnostics/mod.rs index b8f692e..d001188 100644 --- a/src/lang/diagnostics/mod.rs +++ b/src/lang/diagnostics/mod.rs @@ -11,13 +11,13 @@ use tracing::{error, trace}; use self::project_diagnostics::ProjectDiagnostics; use self::refresh::{clear_old_diagnostics, refresh_diagnostics}; use self::trigger::trigger; -use crate::ide::analysis_progress::AnalysisProgressController; +use crate::ide::analysis_progress::AnalysisProgressTracker; use crate::lang::diagnostics::file_batches::{batches, find_primary_files, find_secondary_files}; use crate::lang::lsp::LsProtoGroup; use crate::server::client::Notifier; use crate::server::panic::cancelled_anyhow; use crate::server::schedule::thread::{self, JoinHandle, ThreadPriority}; -use crate::state::{State, StateSnapshot}; +use crate::state::{Beacon, State, StateSnapshot}; mod file_batches; mod file_diagnostics; @@ -42,16 +42,10 @@ pub struct DiagnosticsController { impl DiagnosticsController { /// Creates a new diagnostics controller. - pub fn new( - notifier: Notifier, - analysis_progress_controller: AnalysisProgressController, - ) -> Self { + pub fn new(notifier: Notifier, analysis_progress_tracker: AnalysisProgressTracker) -> Self { let (trigger, receiver) = trigger(); - let (thread, parallelism) = DiagnosticsControllerThread::spawn( - receiver, - notifier, - analysis_progress_controller, - ); + let (thread, parallelism) = + DiagnosticsControllerThread::spawn(receiver, notifier, analysis_progress_tracker); Self { trigger, @@ -72,7 +66,7 @@ struct DiagnosticsControllerThread { notifier: Notifier, pool: thread::Pool, project_diagnostics: ProjectDiagnostics, - analysis_progress_controller: AnalysisProgressController, + analysis_progress_tracker: AnalysisProgressTracker, } impl DiagnosticsControllerThread { @@ -81,12 +75,12 @@ impl DiagnosticsControllerThread { fn spawn( receiver: trigger::Receiver, notifier: Notifier, - analysis_progress_controller: AnalysisProgressController, + analysis_progress_tracker: AnalysisProgressTracker, ) -> (JoinHandle, NonZero) { let this = Self { receiver, notifier, - analysis_progress_controller, + analysis_progress_tracker, pool: thread::Pool::new(), project_diagnostics: ProjectDiagnostics::new(), }; @@ -104,7 +98,7 @@ impl DiagnosticsControllerThread { /// Runs diagnostics controller's event loop. fn event_loop(&self) { while let Some(mut state_snapshots) = self.receiver.wait() { - self.analysis_progress_controller.track_analysis(&mut state_snapshots.0); + self.analysis_progress_tracker.track_analysis(&mut state_snapshots.beacons()); if let Err(err) = catch_unwind(AssertUnwindSafe(|| { self.diagnostics_controller_tick(state_snapshots); })) { @@ -138,7 +132,7 @@ impl DiagnosticsControllerThread { self.spawn_worker(move |project_diagnostics, notifier| { clear_old_diagnostics(files_to_preserve, project_diagnostics, notifier); - state.signal(); + state.signal_finish(); }); } @@ -172,7 +166,7 @@ impl DiagnosticsControllerThread { project_diagnostics, notifier, ); - state.signal(); + state.signal_finish(); }); } } @@ -204,6 +198,10 @@ impl StateSnapshots { let secondary = snapshots.split_off(snapshots.len() / 2); (control, snapshots, secondary) } + + fn beacons(&mut self) -> impl Iterator { + self.0.iter_mut().map(|snapshot| &mut snapshot.beacon) + } } /// Stores necessary properties for creating [`StateSnapshots`]. diff --git a/src/lang/proc_macros/client/mod.rs b/src/lang/proc_macros/client/mod.rs index 0f0ebdd..151f42b 100644 --- a/src/lang/proc_macros/client/mod.rs +++ b/src/lang/proc_macros/client/mod.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, VecDeque}; +use std::fmt::{Debug, Formatter}; use std::sync::{Mutex, MutexGuard}; use anyhow::{Context, Result, anyhow, ensure}; @@ -17,7 +18,7 @@ pub use status::ClientStatus; use tracing::error; use crate::id_generator; -use crate::ide::analysis_progress::AnalysisProgressController; +use crate::ide::analysis_progress::AnalysisProgressTracker; pub mod connection; pub mod status; @@ -29,27 +30,26 @@ pub enum RequestParams { Inline(ExpandInlineMacroParams), } -#[derive(Debug)] pub struct ProcMacroClient { connection: ProcMacroServerConnection, id_generator: id_generator::IdGenerator, requests_params: Mutex>, error_channel: Sender<()>, - analysis_progress_controller: AnalysisProgressController, + analysis_progress_tracker: AnalysisProgressTracker, } impl ProcMacroClient { pub fn new( connection: ProcMacroServerConnection, error_channel: Sender<()>, - analysis_progress_controller: AnalysisProgressController, + analysis_progress_tracker: AnalysisProgressTracker, ) -> Self { Self { connection, id_generator: Default::default(), requests_params: Default::default(), error_channel, - analysis_progress_controller, + analysis_progress_tracker, } } @@ -150,7 +150,7 @@ impl ProcMacroClient { match self.send_request_untracked::(id, ¶ms) { Ok(()) => { requests_params.insert(id, map(params)); - self.analysis_progress_controller.register_procmacro_request(); + self.analysis_progress_tracker.register_procmacro_request(); } Err(err) => { error!("Sending request to proc-macro-server failed: {err:?}"); @@ -165,6 +165,17 @@ impl ProcMacroClient { } } +impl Debug for ProcMacroClient { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ProcMacroClient") + .field("connection", &self.connection) + .field("id_generator", &self.id_generator) + .field("requests_params", &self.requests_params) + .field("error_channel", &self.error_channel) + .finish() + } +} + pub struct Responses<'a> { responses: MutexGuard<'a, VecDeque>, requests: MutexGuard<'a, HashMap>, diff --git a/src/lang/proc_macros/controller.rs b/src/lang/proc_macros/controller.rs index 7c83d54..70c9bb2 100644 --- a/src/lang/proc_macros/controller.rs +++ b/src/lang/proc_macros/controller.rs @@ -19,7 +19,7 @@ use super::client::connection::ProcMacroServerConnection; use super::client::status::ClientStatus; use super::client::{ProcMacroClient, RequestParams}; use crate::config::Config; -use crate::ide::analysis_progress::AnalysisProgressController; +use crate::ide::analysis_progress::AnalysisProgressTracker; use crate::lang::db::AnalysisDatabase; use crate::lang::proc_macros::db::ProcMacroGroup; use crate::lang::proc_macros::plugins::proc_macro_plugin_suite; @@ -54,7 +54,7 @@ pub struct ProcMacroClientController { plugin_suite: Option, initialization_retries: RateLimiter, channels: ProcMacroChannels, - analysis_progress_controller: AnalysisProgressController, + analysis_progress_tracker: AnalysisProgressTracker, } impl ProcMacroClientController { @@ -65,12 +65,12 @@ impl ProcMacroClientController { pub fn new( scarb: ScarbToolchain, notifier: Notifier, - analysis_progress_controller: AnalysisProgressController, + analysis_progress_tracker: AnalysisProgressTracker, ) -> Self { Self { scarb, notifier, - analysis_progress_controller, + analysis_progress_tracker, plugin_suite: Default::default(), initialization_retries: RateLimiter::direct( Quota::with_period(Duration::from_secs( @@ -177,7 +177,7 @@ impl ProcMacroClientController { self.channels.response_sender.clone(), ), self.channels.error_sender.clone(), - self.analysis_progress_controller.clone(), + self.analysis_progress_tracker.clone(), ); client.start_initialize(); diff --git a/src/state.rs b/src/state.rs index ee709be..9b32f42 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::default::Default; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -44,11 +45,11 @@ impl State { let proc_macro_controller = ProcMacroClientController::new( scarb_toolchain.clone(), notifier.clone(), - analysis_progress_controller.clone(), + analysis_progress_controller.tracker(), ); let diagnostics_controller = - DiagnosticsController::new(notifier.clone(), analysis_progress_controller.clone()); + DiagnosticsController::new(notifier.clone(), analysis_progress_controller.tracker()); Self { db: AnalysisDatabase::new(&tricks), @@ -66,28 +67,22 @@ impl State { } pub fn snapshot(&self) -> StateSnapshot { - Beacon::wrap(SnapshotInternal { + StateSnapshot { db: self.db.snapshot(), open_files: self.open_files.snapshot(), config: self.config.snapshot(), - }) + beacon: Default::default(), + } } } - -pub struct Beacon { - value: T, +/// Struct which allows setting a callback - which can be triggered afterward +/// by the function which has the reference. +#[derive(Default)] +pub struct Beacon { signal_hook: Option>, } -impl Beacon -where - T: Send, -{ - // Constructor to wrap a value - pub fn wrap(value: T) -> Self { - Self { value, signal_hook: None } - } - +impl Beacon { // Set the drop hook pub fn on_signal(&mut self, drop_hook: F) where @@ -103,21 +98,19 @@ where } } -impl Deref for Beacon { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.value - } -} - -pub type StateSnapshot = Beacon; - /// Readonly snapshot of Language server state. -pub struct SnapshotInternal { +pub struct StateSnapshot { pub db: salsa::Snapshot, pub open_files: Snapshot>, pub config: Snapshot, + /// Beacon to signal when the snapshot is no longer used + pub beacon: Beacon, +} + +impl StateSnapshot { + pub(crate) fn signal_finish(&mut self) { + self.beacon.signal(); + } } impl std::panic::UnwindSafe for StateSnapshot {}