Skip to content

Commit

Permalink
lsp: Parse LSP messages on background thread - again
Browse files Browse the repository at this point in the history
This is a follow-up to #12640.
While profiling latency of working with a project with 8192 diagnostics I've noticed that while we're parsing the LSP messages into a generic message struct on a background thread, we can still block the main thread
as the conversion between that generic message struct and the actual LSP message (for use by callback) is still happening on the main thread.
This PR significantly constrains what a message callback can use, so that it can be executed on any thread; we also send off message conversion to the background thread.

This has improved throughput of my 8192-benchmark from 40s to send out all diagnostics after saving to ~20s. Now main thread is spending most of the time updating our diagnostics sets, which can probably be improved.
  • Loading branch information
osiewicz committed Jan 14, 2025
1 parent fcadd3e commit f452cfb
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 62 deletions.
4 changes: 2 additions & 2 deletions crates/editor/src/test/editor_lsp_test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ impl EditorLspTestContext {

pub fn handle_request<T, F, Fut>(
&self,
mut handler: F,
handler: F,
) -> futures::channel::mpsc::UnboundedReceiver<()>
where
T: 'static + request::Request,
T::Params: 'static + Send,
F: 'static + Send + FnMut(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
F: 'static + Send + Sync + Fn(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
Fut: 'static + Send + Future<Output = Result<T::Result>>,
{
let url = self.buffer_lsp_url.clone();
Expand Down
140 changes: 80 additions & 60 deletions crates/lsp/src/lsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: ";
const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);

type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, Value, AsyncAppContext)>;
type NotificationHandler = Arc<dyn Send + Sync + Fn(Option<RequestId>, Value, AsyncAppContext)>;
type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;

Expand Down Expand Up @@ -890,7 +890,7 @@ impl LanguageServer {
pub fn on_notification<T, F>(&self, f: F) -> Subscription
where
T: notification::Notification,
F: 'static + Send + FnMut(T::Params, AsyncAppContext),
F: 'static + Send + Sync + Fn(T::Params, AsyncAppContext),
{
self.on_custom_notification(T::METHOD, f)
}
Expand All @@ -903,7 +903,7 @@ impl LanguageServer {
where
T: request::Request,
T::Params: 'static + Send,
F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
F: 'static + Fn(T::Params, AsyncAppContext) -> Fut + Send + Sync,
Fut: 'static + Future<Output = Result<T::Result>>,
{
self.on_custom_request(T::METHOD, f)
Expand Down Expand Up @@ -939,17 +939,27 @@ impl LanguageServer {
}

#[must_use]
fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
fn on_custom_notification<Params, F>(&self, method: &'static str, f: F) -> Subscription
where
F: 'static + FnMut(Params, AsyncAppContext) + Send,
Params: DeserializeOwned,
F: 'static + Fn(Params, AsyncAppContext) + Send + Sync,
Params: DeserializeOwned + Send + 'static,
{
let callback = Arc::new(f);
let prev_handler = self.notification_handlers.lock().insert(
method,
Box::new(move |_, params, cx| {
if let Some(params) = serde_json::from_value(params).log_err() {
f(params, cx);
}
Arc::new(move |_, params, cx| {
let callback = callback.clone();

cx.spawn(move |cx| async move {
if let Some(params) = cx
.background_executor()
.spawn(async move { serde_json::from_value(params).log_err() })
.await
{
callback(params, cx);
}
})
.detach();
}),
);
assert!(
Expand All @@ -963,64 +973,74 @@ impl LanguageServer {
}

#[must_use]
fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, f: F) -> Subscription
where
F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
F: 'static + Fn(Params, AsyncAppContext) -> Fut + Send + Sync,
Fut: 'static + Future<Output = Result<Res>>,
Params: DeserializeOwned + Send + 'static,
Res: Serialize,
{
let outbound_tx = self.outbound_tx.clone();
let f = Arc::new(f);
let prev_handler = self.notification_handlers.lock().insert(
method,
Box::new(move |id, params, cx| {
Arc::new(move |id, params, cx| {
if let Some(id) = id {
match serde_json::from_value(params) {
Ok(params) => {
let response = f(params, cx.clone());
cx.foreground_executor()
.spawn({
let outbound_tx = outbound_tx.clone();
async move {
let response = match response.await {
Ok(result) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Ok(Some(result)),
},
Err(error) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Error(Some(Error {
message: error.to_string(),
})),
},
};
if let Some(response) =
serde_json::to_string(&response).log_err()
{
outbound_tx.try_send(response).ok();
}
let f = f.clone();
let deserialized_params = cx
.background_executor()
.spawn(async move { serde_json::from_value(params) });

cx.spawn({
let outbound_tx = outbound_tx.clone();
move |cx| async move {
match deserialized_params.await {
Ok(params) => {
let response = f(params, cx.clone());
let response = match response.await {
Ok(result) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Ok(Some(result)),
},
Err(error) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Error(Some(Error {
message: error.to_string(),
})),
},
};
if let Some(response) =
serde_json::to_string(&response).log_err()
{
outbound_tx.try_send(response).ok();
}
})
.detach();
}

Err(error) => {
log::error!("error deserializing {} request: {:?}", method, error);
let response = AnyResponse {
jsonrpc: JSON_RPC_VERSION,
id,
result: None,
error: Some(Error {
message: error.to_string(),
}),
};
if let Some(response) = serde_json::to_string(&response).log_err() {
outbound_tx.try_send(response).ok();
}
Err(error) => {
log::error!(
"error deserializing {} request: {:?}",
method,
error
);
let response = AnyResponse {
jsonrpc: JSON_RPC_VERSION,
id,
result: None,
error: Some(Error {
message: error.to_string(),
}),
};
if let Some(response) =
serde_json::to_string(&response).log_err()
{
outbound_tx.try_send(response).ok();
}
}
}
}
}
})
.detach();
}
}),
);
Expand Down Expand Up @@ -1425,12 +1445,12 @@ impl FakeLanguageServer {
/// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
pub fn handle_request<T, F, Fut>(
&self,
mut handler: F,
handler: F,
) -> futures::channel::mpsc::UnboundedReceiver<()>
where
T: 'static + request::Request,
T::Params: 'static + Send,
F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext) -> Fut,
Fut: 'static + Send + Future<Output = Result<T::Result>>,
{
let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
Expand All @@ -1454,12 +1474,12 @@ impl FakeLanguageServer {
/// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
pub fn handle_notification<T, F>(
&self,
mut handler: F,
handler: F,
) -> futures::channel::mpsc::UnboundedReceiver<()>
where
T: 'static + notification::Notification,
T::Params: 'static + Send,
F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext),
{
let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
self.server.remove_notification_handler::<T>();
Expand Down

0 comments on commit f452cfb

Please sign in to comment.