Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analyzing progress notifications #44

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ impl Config {
response.pop_front().as_ref().and_then(Value::as_bool).unwrap_or_default();
state.config.enable_proc_macros =
response.pop_front().as_ref().and_then(Value::as_bool).unwrap_or(false);
state
.analysis_progress_controller
.set_procmacros_enabled(state.config.enable_proc_macros);

debug!("reloaded configuration: {:#?}", state.config);

Expand Down
File renamed without changes.
180 changes: 180 additions & 0 deletions src/ide/analysis_progress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use lsp_types::notification::Notification;

use crate::id_generator::IdGenerator;
use crate::server::client::Notifier;
use crate::state::Beacon;

/// A facade for `AnalysisProgressController` that allows to track progress of diagnostics
Arcticae marked this conversation as resolved.
Show resolved Hide resolved
/// generation and procmacro requests.
#[derive(Clone)]
pub struct AnalysisProgressTracker {
controller: AnalysisProgressController,
}

impl AnalysisProgressTracker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can split it for proc macros / diagnostics so interface is clear there, also it does not have to store whole controller (and should not if it possible, if not then maybe controller itself is ok).

/// Signals that a request to proc macro server was made during the current generation of
/// diagnostics.
pub fn register_procmacro_request(&self) {
self.controller.set_did_submit_procmacro_request(true);
}

/// Sets handlers for tracking beacons sent to threads.
/// The beacons are wrapping snapshots, which are signalling when diagnostics finished
/// calculating for a given snapshot (used for calculating files diagnostics or removing
/// stale ones)
pub fn track_analysis<'a>(&self, beacons: impl Iterator<Item = &'a mut Beacon>) {
let gen_id = self.controller.next_generation_id();

self.controller.clear_active_snapshots();

beacons.enumerate().for_each(|(i, beacon)| {
self.controller.insert_active_snapshot(i);

let controller_ref: AnalysisProgressController = self.controller.clone();
beacon.on_signal(move || controller_ref.on_snapshot_deactivate(gen_id, i));
});

self.controller.start_analysis();
}
}

/// Controller used to send notifications to the client about analysis progress.
/// Uses information provided from other controllers (diagnostics controller, procmacro controller)
/// to assess if diagnostics are in fact calculated.
#[derive(Debug, Clone)]
pub struct AnalysisProgressController {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get flow explanation here? like in ProcMacroController

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do although i think it's nicely explained here in the docstrings

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBD Still - i would like to finalize business logic first if that's OK so i don't have to update this flowchart

notifier: Notifier,
/// ID of the diagnostics "generation" - the scheduled diagnostics jobs set.
/// Used to filter out stale threads finishing when new ones (from newer "generation")
/// are already in progress and being tracked by the controller.
generation_id: Arc<AtomicU64>,
/// Sequential IDs of state snapshots from the current generation, used to track their status
/// (present meaning it's still being used)
active_snapshots: Arc<Mutex<HashSet<usize>>>,
id_generator: Arc<IdGenerator>,
/// If `true` - a request to procmacro server was submitted, meaning that analysis will extend
/// beyond the current generation of diagnostics.
did_submit_procmacro_request: Arc<AtomicBool>,
/// Indicates that a notification was sent and analysis (i.e. macro expansion) is taking place.
analysis_in_progress: Arc<AtomicBool>,
/// Loaded asynchronously from config - unset if config was not loaded yet.
/// Has to be set in order for analysis to finish.
procmacros_enabled: Arc<Mutex<Option<bool>>>,
}

impl AnalysisProgressController {
pub fn tracker(&self) -> AnalysisProgressTracker {
AnalysisProgressTracker { controller: self.clone() }
}

pub fn new(notifier: Notifier) -> Self {
let id_generator = Arc::new(IdGenerator::default());
Self {
notifier,
id_generator: id_generator.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why clone here, just generate unique_id before the struct constructor expr

active_snapshots: Arc::new(Mutex::new(HashSet::default())),
did_submit_procmacro_request: Arc::new(AtomicBool::new(false)),
analysis_in_progress: Arc::new(AtomicBool::new(false)),
procmacros_enabled: Arc::new(Mutex::new(None)),
generation_id: Arc::new(AtomicU64::new(id_generator.unique_id())),
}
}

pub fn set_did_submit_procmacro_request(&self, value: bool) {
self.did_submit_procmacro_request.store(value, Ordering::SeqCst);
}

/// Allows to set the procmacro configuration to whatever is in the config, upon loading it.
pub fn set_procmacros_enabled(&self, value: bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn set_procmacros_enabled(&self, value: bool) {
pub fn on_config_change(&self, config: &Config) {

More future proof and it is clear where this info is coming from.

let mut guard = self.procmacros_enabled.lock().unwrap();
*guard = Some(value);
}

pub fn insert_active_snapshot(&self, snapshot_id: usize) {
let mut active_snapshots = self.active_snapshots.lock().unwrap();
active_snapshots.insert(snapshot_id);
}

pub fn on_snapshot_deactivate(&self, snapshot_gen_id: u64, snapshot_id: usize) {
let current_gen = self.get_generation_id();
if current_gen == snapshot_gen_id {
self.remove_active_snapshot(snapshot_id);
self.try_stop_analysis();
}
}

pub fn next_generation_id(&self) -> u64 {
let new_gen_id = self.id_generator.unique_id();
self.generation_id.store(new_gen_id, Ordering::SeqCst);
new_gen_id
}

pub fn get_generation_id(&self) -> u64 {
self.generation_id.load(Ordering::SeqCst)
}
Comment on lines +116 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used in one place, just inline it imo


pub fn remove_active_snapshot(&self, snapshot_id: usize) {
let mut active_snapshots = self.active_snapshots.lock().unwrap();
active_snapshots.remove(&snapshot_id);
}

pub fn clear_active_snapshots(&self) {
let active_snapshots_ref = self.active_snapshots.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to clone here?

active_snapshots_ref.lock().unwrap().clear();
}

/// Starts a next generation of diagnostics, sends a notification
fn start_analysis(&self) {
let analysis_in_progress = self.analysis_in_progress.load(Ordering::SeqCst);
let config_loaded = self.procmacros_enabled.lock().unwrap().is_some();
// We want to clear this flag always when starting a new generation to track the requests
// properly
self.did_submit_procmacro_request.store(false, Ordering::SeqCst);

if !analysis_in_progress && config_loaded {
self.analysis_in_progress.store(true, Ordering::SeqCst);
self.notifier.notify::<DiagnosticsCalculationStart>(());
}
}

/// Checks a bunch of conditions and if they are fulfilled, sends stop notification
/// and resets the state back to start of generation defaults.
fn try_stop_analysis(&self) {
let did_submit_procmacro_request = self.did_submit_procmacro_request.load(Ordering::SeqCst);
let snapshots_empty = self.active_snapshots.lock().unwrap().is_empty();
let analysis_in_progress = self.analysis_in_progress.load(Ordering::SeqCst);
let procmacros_enabled = *self.procmacros_enabled.lock().unwrap();

if snapshots_empty
&& (!did_submit_procmacro_request || (procmacros_enabled == Some(false)))
&& analysis_in_progress
{
self.did_submit_procmacro_request.store(false, Ordering::SeqCst);
self.analysis_in_progress.store(false, Ordering::SeqCst);

self.notifier.notify::<DiagnosticsCalculationFinish>(());
}
}
}

/// Notifies about diagnostics generation which is beginning to calculate
#[derive(Debug)]
pub struct DiagnosticsCalculationStart;

impl Notification for DiagnosticsCalculationStart {
type Params = ();
const METHOD: &'static str = "cairo/diagnosticsCalculationStart";
}

/// Notifies about diagnostics generation which ended calculating
#[derive(Debug)]
pub struct DiagnosticsCalculationFinish;

impl Notification for DiagnosticsCalculationFinish {
type Params = ();
const METHOD: &'static str = "cairo/diagnosticsCalculationFinish";
}
Comment on lines +164 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk if it shouldn't go to lsp/ext.rs for consistency

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why not place it near the usage tbh

1 change: 1 addition & 0 deletions src/ide/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod analysis_progress;
pub mod code_actions;
pub mod completion;
pub mod formatter;
Expand Down
28 changes: 20 additions & 8 deletions src/lang/diagnostics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use tracing::{error, trace};
use self::project_diagnostics::ProjectDiagnostics;
use self::refresh::{clear_old_diagnostics, refresh_diagnostics};
use self::trigger::trigger;
use crate::ide::analysis_progress::AnalysisProgressTracker;
use crate::lang::diagnostics::file_batches::{batches, find_primary_files, find_secondary_files};
use crate::lang::lsp::LsProtoGroup;
use crate::server::client::Notifier;
use crate::server::panic::cancelled_anyhow;
use crate::server::schedule::thread::{self, JoinHandle, ThreadPriority};
use crate::state::{State, StateSnapshot};
use crate::state::{Beacon, State, StateSnapshot};

mod file_batches;
mod file_diagnostics;
Expand All @@ -41,9 +42,11 @@ pub struct DiagnosticsController {

impl DiagnosticsController {
/// Creates a new diagnostics controller.
pub fn new(notifier: Notifier) -> Self {
pub fn new(notifier: Notifier, analysis_progress_tracker: AnalysisProgressTracker) -> Self {
let (trigger, receiver) = trigger();
let (thread, parallelism) = DiagnosticsControllerThread::spawn(receiver, notifier);
let (thread, parallelism) =
DiagnosticsControllerThread::spawn(receiver, notifier, analysis_progress_tracker);

Self {
trigger,
_thread: thread,
Expand All @@ -53,8 +56,7 @@ impl DiagnosticsController {

/// Schedules diagnostics refreshing on snapshot(s) of the current state.
pub fn refresh(&self, state: &State) {
let state_snapshots = StateSnapshots::new(state, &self.state_snapshots_props);
self.trigger.activate(state_snapshots);
self.trigger.activate(StateSnapshots::new(state, &self.state_snapshots_props));
}
}

Expand All @@ -64,6 +66,7 @@ struct DiagnosticsControllerThread {
notifier: Notifier,
pool: thread::Pool,
project_diagnostics: ProjectDiagnostics,
analysis_progress_tracker: AnalysisProgressTracker,
}

impl DiagnosticsControllerThread {
Expand All @@ -72,10 +75,12 @@ impl DiagnosticsControllerThread {
fn spawn(
receiver: trigger::Receiver<StateSnapshots>,
notifier: Notifier,
analysis_progress_tracker: AnalysisProgressTracker,
) -> (JoinHandle, NonZero<usize>) {
let this = Self {
receiver,
notifier,
analysis_progress_tracker,
pool: thread::Pool::new(),
project_diagnostics: ProjectDiagnostics::new(),
};
Expand All @@ -92,7 +97,8 @@ impl DiagnosticsControllerThread {

/// Runs diagnostics controller's event loop.
fn event_loop(&self) {
while let Some(state_snapshots) = self.receiver.wait() {
while let Some(mut state_snapshots) = self.receiver.wait() {
self.analysis_progress_tracker.track_analysis(&mut state_snapshots.beacons());
if let Err(err) = catch_unwind(AssertUnwindSafe(|| {
self.diagnostics_controller_tick(state_snapshots);
})) {
Expand All @@ -109,7 +115,7 @@ impl DiagnosticsControllerThread {
/// Runs a single tick of the diagnostics controller's event loop.
#[tracing::instrument(skip_all)]
fn diagnostics_controller_tick(&self, state_snapshots: StateSnapshots) {
let (state, primary_snapshots, secondary_snapshots) = state_snapshots.split();
let (mut state, primary_snapshots, secondary_snapshots) = state_snapshots.split();

let primary_set = find_primary_files(&state.db, &state.open_files);
let primary: Vec<_> = primary_set.iter().copied().collect();
Expand All @@ -126,6 +132,7 @@ impl DiagnosticsControllerThread {

self.spawn_worker(move |project_diagnostics, notifier| {
clear_old_diagnostics(files_to_preserve, project_diagnostics, notifier);
state.signal_finish();
});
}

Expand All @@ -150,7 +157,7 @@ impl DiagnosticsControllerThread {
fn spawn_refresh_worker(&self, files: &[FileId], state_snapshots: Vec<StateSnapshot>) {
let files_batches = batches(files, self.pool.parallelism());
assert_eq!(files_batches.len(), state_snapshots.len());
for (batch, state) in zip(files_batches, state_snapshots) {
for (batch, mut state) in zip(files_batches, state_snapshots) {
self.spawn_worker(move |project_diagnostics, notifier| {
refresh_diagnostics(
&state.db,
Expand All @@ -159,6 +166,7 @@ impl DiagnosticsControllerThread {
project_diagnostics,
notifier,
);
state.signal_finish();
});
}
}
Expand Down Expand Up @@ -190,6 +198,10 @@ impl StateSnapshots {
let secondary = snapshots.split_off(snapshots.len() / 2);
(control, snapshots, secondary)
}

fn beacons(&mut self) -> impl Iterator<Item = &mut Beacon> {
self.0.iter_mut().map(|snapshot| &mut snapshot.beacon)
}
}

/// Stores necessary properties for creating [`StateSnapshots`].
Expand Down
26 changes: 23 additions & 3 deletions src/lang/proc_macros/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::sync::{Mutex, MutexGuard};

use anyhow::{Context, Result, anyhow, ensure};
Expand All @@ -16,8 +17,10 @@ use scarb_proc_macro_server_types::methods::expand::{
pub use status::ClientStatus;
use tracing::error;

use crate::id_generator;
use crate::ide::analysis_progress::AnalysisProgressTracker;

pub mod connection;
mod id_generator;
pub mod status;

#[derive(Debug)]
Expand All @@ -27,21 +30,26 @@ pub enum RequestParams {
Inline(ExpandInlineMacroParams),
}

#[derive(Debug)]
pub struct ProcMacroClient {
connection: ProcMacroServerConnection,
id_generator: id_generator::IdGenerator,
requests_params: Mutex<HashMap<RequestId, RequestParams>>,
error_channel: Sender<()>,
analysis_progress_tracker: AnalysisProgressTracker,
}

impl ProcMacroClient {
pub fn new(connection: ProcMacroServerConnection, error_channel: Sender<()>) -> Self {
pub fn new(
connection: ProcMacroServerConnection,
error_channel: Sender<()>,
analysis_progress_tracker: AnalysisProgressTracker,
) -> Self {
Self {
connection,
id_generator: Default::default(),
requests_params: Default::default(),
error_channel,
analysis_progress_tracker,
}
}

Expand Down Expand Up @@ -142,6 +150,7 @@ impl ProcMacroClient {
match self.send_request_untracked::<M>(id, &params) {
Ok(()) => {
requests_params.insert(id, map(params));
self.analysis_progress_tracker.register_procmacro_request();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that there is a small chance that a yet alive snapshot from previous diagnostics calculation was already in db.get_attribute_expansion before its cancellation. It will still call this method and possibly register proc macro request in analysis tracker AFTER track_analysis is called to start tracking the new diagnostics batch. It can cause us to never finish waiting for diagnostics in tests, since:

  • the next batch won't be scheduled
  • we will always think there is some proc macro yet to resolve while it may not be

}
Err(err) => {
error!("Sending request to proc-macro-server failed: {err:?}");
Expand All @@ -156,6 +165,17 @@ impl ProcMacroClient {
}
}

impl Debug for ProcMacroClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProcMacroClient")
.field("connection", &self.connection)
.field("id_generator", &self.id_generator)
.field("requests_params", &self.requests_params)
.field("error_channel", &self.error_channel)
.finish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.finish()
.finish_non_exhaustive()

}
}

pub struct Responses<'a> {
responses: MutexGuard<'a, VecDeque<RpcResponse>>,
requests: MutexGuard<'a, HashMap<RequestId, RequestParams>>,
Expand Down
Loading
Loading