Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[relay-lsp] Ensure documents are synced before calculating completions #4473

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion compiler/crates/relay-lsp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
);

let task_processor = LSPTaskProcessor;
let task_queue = TaskQueue::new(Arc::new(task_processor));
let mut task_queue = TaskQueue::new(Arc::new(task_processor));
let task_scheduler = task_queue.get_scheduler();

config.artifact_writer = Box::new(NoopArtifactWriter);
Expand Down Expand Up @@ -190,6 +190,12 @@ fn next_task(
}
}

static NOTIFCATIONS_MUTATING_LSP_STATE: [&str; 3] = [
"textDocument/didOpen",
"textDocument/didChange",
"textDocument/didClose",
];

struct LSPTaskProcessor;

impl<TPerfLogger: PerfLogger + 'static, TSchemaDocumentation: SchemaDocumentation + 'static>
Expand All @@ -209,6 +215,15 @@ impl<TPerfLogger: PerfLogger + 'static, TSchemaDocumentation: SchemaDocumentatio
}
}
}

fn is_serial_task(&self, task: &Task) -> bool {
match task {
Task::InboundMessage(Message::Notification(notification)) => {
NOTIFCATIONS_MUTATING_LSP_STATE.contains(&notification.method.as_str())
}
_ => false,
}
}
}

fn handle_request<TPerfLogger: PerfLogger + 'static, TSchemaDocumentation: SchemaDocumentation>(
Expand Down
54 changes: 50 additions & 4 deletions compiler/crates/relay-lsp/src/server/task_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use std::time::Instant;

use crossbeam::channel::unbounded;
Expand All @@ -18,10 +19,13 @@ pub struct TaskQueue<S, T> {
processor: Arc<dyn TaskProcessor<S, T>>,
pub receiver: Receiver<T>,
scheduler: Arc<TaskScheduler<T>>,
active_thread_handles: Vec<JoinHandle<()>>,
}

pub trait TaskProcessor<S, T>: Send + Sync + 'static {
fn process(&self, state: Arc<S>, task: T);

fn is_serial_task(&self, task: &T) -> bool;
}

pub struct TaskScheduler<T> {
Expand All @@ -48,25 +52,67 @@ where
processor,
receiver,
scheduler: Arc::new(TaskScheduler { sender }),
active_thread_handles: Vec::new(),
}
}

pub fn get_scheduler(&self) -> Arc<TaskScheduler<T>> {
Arc::clone(&self.scheduler)
}

pub fn process(&self, state: Arc<S>, task: T) {
pub fn process(&mut self, state: Arc<S>, task: T) {
let processor = Arc::clone(&self.processor);
let is_serial_task = processor.is_serial_task(&task);
let should_join_active_threads = self.active_thread_handles.len() > 10;

if is_serial_task || should_join_active_threads {
// Before starting a serial task, we need to make sure that all
// previous tasks have been completed, otherwise the serial task
// might interfere with them.
// We also do this if there are too many "tracked" threads, since
// the threads we spawn are now no longer "detached" and it's our
// responsibility to join them at some point.
self.ensure_previous_tasks_completed();
}

let task_str = format!("{:?}", &task);
let now = Instant::now();
debug!("Processing task {:?}", &task_str);
let processor = Arc::clone(&self.processor);
thread::spawn(move || {

let thread_builder = thread::Builder::new();
let spawn_result = thread_builder.spawn(move || {
processor.process(state, task);

debug!(
"task {} completed in {}ms",
"Task {} completed in {}ms",
task_str,
now.elapsed().as_millis()
);
});

match spawn_result {
Ok(handle) => {
if is_serial_task {
// If the task is serial, we need to wait for its thread
// to complete, before moving onto the next task.
if let Err(error) = handle.join() {
debug!("Thread panicked while joining serial task: {:?}", error);
}
} else {
self.active_thread_handles.push(handle);
}
}
Err(error) => {
debug!("Failed to spawn thread to process task: {:?}", error);
}
}
}

fn ensure_previous_tasks_completed(&mut self) {
for handle in self.active_thread_handles.drain(..) {
if let Err(error) = handle.join() {
debug!("Thread panicked while joining previous task: {:?}", error);
}
}
}
}
Loading