Skip to content

Commit

Permalink
Wrap Connection to facilitate proper server shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
snowsignal committed Apr 30, 2024
1 parent 4779dd1 commit 7d0fba9
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 53 deletions.
4 changes: 2 additions & 2 deletions crates/ruff_server/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::server::ClientSender;

static MESSENGER: OnceLock<ClientSender> = OnceLock::new();

pub(crate) fn init_messenger(client_sender: &ClientSender) {
pub(crate) fn init_messenger(client_sender: ClientSender) {
MESSENGER
.set(client_sender.clone())
.set(client_sender)
.expect("messenger should only be initialized once");

// unregister any previously registered panic hook
Expand Down
69 changes: 33 additions & 36 deletions crates/ruff_server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::num::NonZeroUsize;

use lsp::Connection;
use lsp_server as lsp;
use lsp_types as types;
use types::ClientCapabilities;
Expand All @@ -18,6 +17,8 @@ use types::TextDocumentSyncOptions;
use types::WorkDoneProgressOptions;
use types::WorkspaceFoldersServerCapabilities;

use self::connection::Connection;
use self::connection::ConnectionInitializer;
use self::schedule::event_loop_thread;
use self::schedule::Scheduler;
use self::schedule::Task;
Expand All @@ -28,34 +29,39 @@ use crate::PositionEncoding;

mod api;
mod client;
mod connection;
mod schedule;

pub(crate) use client::ClientSender;
pub(crate) use connection::ClientSender;

pub(crate) type Result<T> = std::result::Result<T, api::Error>;

pub struct Server {
conn: lsp::Connection,
connection: Connection,
client_capabilities: ClientCapabilities,
threads: lsp::IoThreads,
worker_threads: NonZeroUsize,
session: Session,
}

impl Server {
pub fn new(worker_threads: NonZeroUsize) -> crate::Result<Self> {
let (conn, threads) = lsp::Connection::stdio();
let connection = ConnectionInitializer::stdio();

crate::message::init_messenger(&conn.sender);

let (id, params) = conn.initialize_start()?;

let init_params: types::InitializeParams = serde_json::from_value(params)?;
let (id, init_params) = connection.initialize_start()?;

let client_capabilities = init_params.capabilities;
let position_encoding = Self::find_best_position_encoding(&client_capabilities);
let server_capabilities = Self::server_capabilities(position_encoding);

let connection = connection.initialize_finish(
id,
&server_capabilities,
crate::SERVER_NAME,
crate::version(),
)?;

crate::message::init_messenger(connection.make_sender());

let AllSettings {
global_settings,
mut workspace_settings,
Expand Down Expand Up @@ -86,19 +92,8 @@ impl Server {
anyhow::anyhow!("Failed to get the current working directory while creating a default workspace.")
})?;

let initialize_data = serde_json::json!({
"capabilities": server_capabilities,
"serverInfo": {
"name": crate::SERVER_NAME,
"version": crate::version()
}
});

conn.initialize_finish(id, initialize_data)?;

Ok(Self {
conn,
threads,
connection,
worker_threads,
session: Session::new(
&client_capabilities,
Expand All @@ -111,17 +106,20 @@ impl Server {
}

pub fn run(self) -> crate::Result<()> {
let result = event_loop_thread(move || {
event_loop_thread(move || {
Self::event_loop(
&self.conn,
&self.connection,
&self.client_capabilities,
self.session,
self.worker_threads,
)
)?;
self.connection.join_io_threads()?;
// Note: when we start routing tracing through the LSP,
// this should be replaced with a log directly to `stderr`.
tracing::info!("Server has shut down successfully");
Ok(())
})?
.join();
self.threads.join()?;
result
.join()
}

#[allow(clippy::needless_pass_by_value)] // this is because we aren't using `next_request_id` yet.
Expand All @@ -132,22 +130,21 @@ impl Server {
worker_threads: NonZeroUsize,
) -> crate::Result<()> {
let mut scheduler =
schedule::Scheduler::new(&mut session, worker_threads, &connection.sender);
schedule::Scheduler::new(&mut session, worker_threads, connection.make_sender());

Self::try_register_capabilities(client_capabilities, &mut scheduler);
for msg in &connection.receiver {
for msg in connection.receive_messages() {
if connection.handle_shutdown(&msg)? {
break;
}
let task = match msg {
lsp::Message::Request(req) => {
if connection.handle_shutdown(&req)? {
return Ok(());
}
api::request(req)
}
lsp::Message::Request(req) => api::request(req),
lsp::Message::Notification(notification) => api::notification(notification),
lsp::Message::Response(response) => scheduler.response(response),
};
scheduler.dispatch(task);
}

Ok(())
}

Expand Down
19 changes: 8 additions & 11 deletions crates/ruff_server/src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use lsp_server::{Notification, RequestId};
use rustc_hash::FxHashMap;
use serde_json::Value;

use super::schedule::Task;

pub(crate) type ClientSender = crossbeam::channel::Sender<lsp_server::Message>;
use super::{schedule::Task, ClientSender};

type ResponseBuilder<'s> = Box<dyn FnOnce(lsp_server::Response) -> Task<'s>>;

Expand All @@ -29,12 +27,12 @@ pub(crate) struct Requester<'s> {
}

impl<'s> Client<'s> {
pub(super) fn new(sender: &ClientSender) -> Self {
pub(super) fn new(sender: ClientSender) -> Self {
Self {
notifier: Notifier(sender.clone()),
responder: Responder(sender.clone()),
requester: Requester {
sender: sender.clone(),
sender,
next_request_id: 1,
response_handlers: FxHashMap::default(),
},
Expand All @@ -60,16 +58,15 @@ impl Notifier {

let message = lsp_server::Message::Notification(Notification::new(method, params));

Ok(self.0.send(message)?)
self.0.send(message)
}

pub(crate) fn notify_method(&self, method: String) -> crate::Result<()> {
Ok(self
.0
self.0
.send(lsp_server::Message::Notification(Notification::new(
method,
Value::Null,
)))?)
)))
}
}

Expand All @@ -82,15 +79,15 @@ impl Responder {
where
R: serde::Serialize,
{
Ok(self.0.send(
self.0.send(
match result {
Ok(res) => lsp_server::Response::new_ok(id, res),
Err(crate::server::api::Error { code, error }) => {
lsp_server::Response::new_err(id, code as i32, format!("{error}"))
}
}
.into(),
)?)
)
}
}

Expand Down
138 changes: 138 additions & 0 deletions crates/ruff_server/src/server/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use lsp::IoThreads;
use lsp_server as lsp;
use lsp_types::{notification::Notification, request::Request};
use std::sync::{Arc, Weak};

type ConnectionSender = crossbeam::channel::Sender<lsp::Message>;
type ConnectionReceiver = crossbeam::channel::Receiver<lsp::Message>;

pub(crate) struct ConnectionInitializer {
connection: lsp::Connection,
threads: lsp::IoThreads,
}

pub(crate) struct Connection {
sender: Arc<ConnectionSender>,
receiver: ConnectionReceiver,
threads: lsp::IoThreads,
}

impl ConnectionInitializer {
/// Create a new LSP server connection over stdin/stdout.
pub(super) fn stdio() -> Self {
let (connection, threads) = lsp::Connection::stdio();
Self {
connection,
threads,
}
}

/// Starts the initialization process with the client by listening for an intialization request.
/// Returns a request ID that should be passed into `initialize_finish` later,
/// along with the initialization parameters that were provided.
pub(super) fn initialize_start(
&self,
) -> crate::Result<(lsp::RequestId, lsp_types::InitializeParams)> {
let (id, params) = self.connection.initialize_start()?;
Ok((id, serde_json::from_value(params)?))
}

/// Finishes the initialization process with the client,
/// returning an initialized `Connection`.
pub(super) fn initialize_finish(
self,
id: lsp::RequestId,
server_capabilities: &lsp_types::ServerCapabilities,
name: &str,
version: &str,
) -> crate::Result<Connection> {
self.connection.initialize_finish(
id,
serde_json::json!({
"capabilities": server_capabilities,
"serverInfo": {
"name": name,
"version": version
}
}),
)?;
Ok(Connection::initialized(self.connection, self.threads))
}
}

impl Connection {
fn initialized(
lsp::Connection { sender, receiver }: lsp::Connection,
threads: IoThreads,
) -> Self {
Self {
sender: Arc::new(sender),
receiver,
threads,
}
}
pub(super) fn make_sender(&self) -> ClientSender {
ClientSender {
weak_sender: Arc::downgrade(&self.sender),
}
}

pub(super) fn receive_messages(&self) -> crossbeam::channel::Iter<lsp::Message> {
self.receiver.iter()
}

///
pub(super) fn handle_shutdown(&self, message: &lsp::Message) -> crate::Result<bool> {
match message {
lsp::Message::Request(lsp::Request { id, method, .. })
if method == lsp_types::request::Shutdown::METHOD =>
{
self.sender
.send(lsp::Response::new_ok(id.clone(), ()).into())?;
tracing::info!("Shutdown request received. Waiting for an exit notification...");
match self.receiver.recv_timeout(std::time::Duration::from_secs(30))? {
lsp::Message::Notification(lsp::Notification { method, .. }) if method == lsp_types::notification::Exit::METHOD => {
tracing::info!("Exit notification received. Server shutting down...");
Ok(true)
},
message => anyhow::bail!("Server received unexpected message {message:?} while waiting for exit notification")
}
}
lsp::Message::Notification(lsp::Notification { method, .. })
if method == lsp_types::notification::Exit::METHOD =>
{
tracing::error!("Server received an exit notification before a shutdown request was sent. Exiting...");
Ok(true)
}
_ => Ok(false),
}
}

/// Joins the I/O threads that underpin this connection.
pub(super) fn join_io_threads(self) -> crate::Result<()> {
std::mem::drop(
Arc::into_inner(self.sender)
.expect("the client sender shouldn't have more than one strong reference"),
);
std::mem::drop(self.receiver);
self.threads.join()?;
Ok(())
}
}

/// A weak reference to an underlying sender channel for communication with the client.
#[derive(Clone, Debug)]
pub(crate) struct ClientSender {
weak_sender: Weak<ConnectionSender>,
}

// note: additional wrapper functions for senders may be implemented as needed.
impl ClientSender {
pub(crate) fn send(&self, msg: lsp::Message) -> crate::Result<()> {
let Some(sender) = self.weak_sender.upgrade() else {
anyhow::bail!("The connection with the client has been closed");
};

Ok(sender.send(msg)?)
}
}
6 changes: 2 additions & 4 deletions crates/ruff_server/src/server/schedule.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::num::NonZeroUsize;

use crossbeam::channel::Sender;

use crate::session::Session;

mod task;
Expand All @@ -14,7 +12,7 @@ use self::{
thread::ThreadPriority,
};

use super::client::Client;
use super::{client::Client, ClientSender};

/// The event loop thread is actually a secondary thread that we spawn from the
/// _actual_ main thread. This secondary thread has a larger stack size
Expand Down Expand Up @@ -45,7 +43,7 @@ impl<'s> Scheduler<'s> {
pub(super) fn new(
session: &'s mut Session,
worker_threads: NonZeroUsize,
sender: &Sender<lsp_server::Message>,
sender: ClientSender,
) -> Self {
const FMT_THREADS: usize = 1;
Self {
Expand Down

0 comments on commit 7d0fba9

Please sign in to comment.