Skip to content

Commit

Permalink
Revert "lsp: Parse LSP messages on background thread - again (#23122)"
Browse files Browse the repository at this point in the history
This reverts commit 1b3b825.
  • Loading branch information
ConradIrwin committed Jan 17, 2025
1 parent 0dda985 commit 5af2261
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 83 deletions.
1 change: 0 additions & 1 deletion crates/collab/src/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4197,7 +4197,6 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
}],
},
);
executor.run_until_parked();
}
fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
Expand Down
4 changes: 2 additions & 2 deletions crates/editor/src/test/editor_lsp_test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ impl EditorLspTestContext {

pub fn handle_request<T, F, Fut>(
&self,
handler: F,
mut handler: F,
) -> futures::channel::mpsc::UnboundedReceiver<()>
where
T: 'static + request::Request,
T::Params: 'static + Send,
F: 'static + Send + Sync + Fn(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
F: 'static + Send + FnMut(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
Fut: 'static + Send + Future<Output = Result<T::Result>>,
{
let url = self.buffer_lsp_url.clone();
Expand Down
140 changes: 60 additions & 80 deletions crates/lsp/src/lsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: ";
const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);

type NotificationHandler = Arc<dyn Send + Sync + Fn(Option<RequestId>, Value, AsyncAppContext)>;
type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, Value, AsyncAppContext)>;
type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;

Expand Down Expand Up @@ -890,7 +890,7 @@ impl LanguageServer {
pub fn on_notification<T, F>(&self, f: F) -> Subscription
where
T: notification::Notification,
F: 'static + Send + Sync + Fn(T::Params, AsyncAppContext),
F: 'static + Send + FnMut(T::Params, AsyncAppContext),
{
self.on_custom_notification(T::METHOD, f)
}
Expand All @@ -903,7 +903,7 @@ impl LanguageServer {
where
T: request::Request,
T::Params: 'static + Send,
F: 'static + Fn(T::Params, AsyncAppContext) -> Fut + Send + Sync,
F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
Fut: 'static + Future<Output = Result<T::Result>>,
{
self.on_custom_request(T::METHOD, f)
Expand Down Expand Up @@ -939,27 +939,17 @@ impl LanguageServer {
}

#[must_use]
fn on_custom_notification<Params, F>(&self, method: &'static str, f: F) -> Subscription
fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
where
F: 'static + Fn(Params, AsyncAppContext) + Send + Sync,
Params: DeserializeOwned + Send + 'static,
F: 'static + FnMut(Params, AsyncAppContext) + Send,
Params: DeserializeOwned,
{
let callback = Arc::new(f);
let prev_handler = self.notification_handlers.lock().insert(
method,
Arc::new(move |_, params, cx| {
let callback = callback.clone();

cx.spawn(move |cx| async move {
if let Some(params) = cx
.background_executor()
.spawn(async move { serde_json::from_value(params).log_err() })
.await
{
callback(params, cx);
}
})
.detach();
Box::new(move |_, params, cx| {
if let Some(params) = serde_json::from_value(params).log_err() {
f(params, cx);
}
}),
);
assert!(
Expand All @@ -973,74 +963,64 @@ impl LanguageServer {
}

#[must_use]
fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, f: F) -> Subscription
fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
where
F: 'static + Fn(Params, AsyncAppContext) -> Fut + Send + Sync,
F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
Fut: 'static + Future<Output = Result<Res>>,
Params: DeserializeOwned + Send + 'static,
Res: Serialize,
{
let outbound_tx = self.outbound_tx.clone();
let f = Arc::new(f);
let prev_handler = self.notification_handlers.lock().insert(
method,
Arc::new(move |id, params, cx| {
Box::new(move |id, params, cx| {
if let Some(id) = id {
let f = f.clone();
let deserialized_params = cx
.background_executor()
.spawn(async move { serde_json::from_value(params) });

cx.spawn({
let outbound_tx = outbound_tx.clone();
move |cx| async move {
match deserialized_params.await {
Ok(params) => {
let response = f(params, cx.clone());
let response = match response.await {
Ok(result) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Ok(Some(result)),
},
Err(error) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Error(Some(Error {
message: error.to_string(),
})),
},
};
if let Some(response) =
serde_json::to_string(&response).log_err()
{
outbound_tx.try_send(response).ok();
}
}
Err(error) => {
log::error!(
"error deserializing {} request: {:?}",
method,
error
);
let response = AnyResponse {
jsonrpc: JSON_RPC_VERSION,
id,
result: None,
error: Some(Error {
message: error.to_string(),
}),
};
if let Some(response) =
serde_json::to_string(&response).log_err()
{
outbound_tx.try_send(response).ok();
match serde_json::from_value(params) {
Ok(params) => {
let response = f(params, cx.clone());
cx.foreground_executor()
.spawn({
let outbound_tx = outbound_tx.clone();
async move {
let response = match response.await {
Ok(result) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Ok(Some(result)),
},
Err(error) => Response {
jsonrpc: JSON_RPC_VERSION,
id,
value: LspResult::Error(Some(Error {
message: error.to_string(),
})),
},
};
if let Some(response) =
serde_json::to_string(&response).log_err()
{
outbound_tx.try_send(response).ok();
}
}
}
})
.detach();
}

Err(error) => {
log::error!("error deserializing {} request: {:?}", method, error);
let response = AnyResponse {
jsonrpc: JSON_RPC_VERSION,
id,
result: None,
error: Some(Error {
message: error.to_string(),
}),
};
if let Some(response) = serde_json::to_string(&response).log_err() {
outbound_tx.try_send(response).ok();
}
}
})
.detach();
}
}
}),
);
Expand Down Expand Up @@ -1445,12 +1425,12 @@ impl FakeLanguageServer {
/// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
pub fn handle_request<T, F, Fut>(
&self,
handler: F,
mut handler: F,
) -> futures::channel::mpsc::UnboundedReceiver<()>
where
T: 'static + request::Request,
T::Params: 'static + Send,
F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext) -> Fut,
F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
Fut: 'static + Send + Future<Output = Result<T::Result>>,
{
let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
Expand All @@ -1474,12 +1454,12 @@ impl FakeLanguageServer {
/// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
pub fn handle_notification<T, F>(
&self,
handler: F,
mut handler: F,
) -> futures::channel::mpsc::UnboundedReceiver<()>
where
T: 'static + notification::Notification,
T::Params: 'static + Send,
F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext),
F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
{
let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
self.server.remove_notification_handler::<T>();
Expand Down

0 comments on commit 5af2261

Please sign in to comment.