Skip to content
This repository has been archived by the owner on Aug 31, 2023. It is now read-only.

Commit

Permalink
feat(rome_lsp): shutdown the automatically spawned daemon instance on…
Browse files Browse the repository at this point in the history
…ce the last client disconnect
  • Loading branch information
leops committed Nov 10, 2022
1 parent 90ec8a4 commit b6450b0
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 18 deletions.
10 changes: 6 additions & 4 deletions crates/rome_cli/src/commands/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{

pub(crate) fn start(mut session: CliSession) -> Result<(), Termination> {
let rt = Runtime::new()?;
let did_spawn = rt.block_on(ensure_daemon())?;
let did_spawn = rt.block_on(ensure_daemon(true))?;

if did_spawn {
session.app.console.log(markup! {
Expand Down Expand Up @@ -60,11 +60,13 @@ pub(crate) fn stop(mut session: CliSession) -> Result<(), Termination> {
Ok(())
}

pub(crate) fn run_server() -> Result<(), Termination> {
pub(crate) fn run_server(mut session: CliSession) -> Result<(), Termination> {
setup_tracing_subscriber();

let no_timeout = session.args.contains("--no-timeout");

let rt = Runtime::new()?;
let factory = ServerFactory::default();
let factory = ServerFactory::new(!no_timeout);
let cancellation = factory.cancellation();
let span = debug_span!("Running Server", pid = std::process::id());

Expand Down Expand Up @@ -101,7 +103,7 @@ pub(crate) fn lsp_proxy() -> Result<(), Termination> {
/// Receives a process via `stdin` and then copy the content to the LSP socket.
/// Copy to the process on `stdout` when the LSP responds to a message
async fn start_lsp_proxy(rt: &Runtime) -> Result<(), Termination> {
ensure_daemon().await?;
ensure_daemon(false).await?;

match open_socket().await? {
Some((mut owned_read_half, mut owned_write_half)) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/rome_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<'app> CliSession<'app> {
Some("lsp-proxy") => commands::daemon::lsp_proxy(),

// Internal commands
Some("__run_server") => commands::daemon::run_server(),
Some("__run_server") => commands::daemon::run_server(self),
Some("__print_socket") => commands::daemon::print_socket(),

// Print the help for known commands called without any arguments, and exit with an error
Expand Down
12 changes: 8 additions & 4 deletions crates/rome_cli/src/service/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ async fn try_connect() -> io::Result<UnixStream> {
}

/// Spawn the daemon server process in the background
fn spawn_daemon() -> io::Result<Child> {
fn spawn_daemon(no_timeout: bool) -> io::Result<Child> {
let binary = env::current_exe()?;

let mut cmd = Command::new(binary);
cmd.arg("__run_server");

if no_timeout {
cmd.arg("--no-timeout");
}

// Create a new session for the process and make it the leader, this will
// ensures that the child process is fully detached from its parent and will
// continue running in the background even after the parent process exits
Expand Down Expand Up @@ -102,7 +106,7 @@ pub(crate) async fn open_socket() -> io::Result<Option<(OwnedReadHalf, OwnedWrit
///
/// Returns false if the daemon process was already running or true if it had
/// to be started
pub(crate) async fn ensure_daemon() -> io::Result<bool> {
pub(crate) async fn ensure_daemon(no_timeout: bool) -> io::Result<bool> {
let mut current_child: Option<Child> = None;
let mut last_error = None;

Expand Down Expand Up @@ -140,7 +144,7 @@ pub(crate) async fn ensure_daemon() -> io::Result<bool> {
} else {
// Spawn the daemon process and wait a few milliseconds for
// it to become ready then retry the connection
current_child = Some(spawn_daemon()?);
current_child = Some(spawn_daemon(no_timeout)?);
time::sleep(Duration::from_millis(50)).await;
}
}
Expand All @@ -162,7 +166,7 @@ pub(crate) async fn ensure_daemon() -> io::Result<bool> {
/// Ensure the server daemon is running and ready to receive connections and
/// print the global socket name in the standard output
pub(crate) async fn print_socket() -> io::Result<()> {
ensure_daemon().await?;
ensure_daemon(false).await?;
println!("{}", get_socket_name().display());
Ok(())
}
Expand Down
12 changes: 8 additions & 4 deletions crates/rome_cli/src/service/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ async fn try_connect() -> io::Result<NamedPipeClient> {
const CREATE_NEW_PROCESS_GROUP: u32 = 0x00000200;

/// Spawn the daemon server process in the background
fn spawn_daemon() -> io::Result<()> {
fn spawn_daemon(no_timeout: bool) -> io::Result<()> {
let binary = env::current_exe()?;

let mut cmd = Command::new(binary);
cmd.arg("__run_server");

if no_timeout {
cmd.arg("--no-timeout");
}

cmd.creation_flags(CREATE_NEW_PROCESS_GROUP);

cmd.spawn()?;
Expand Down Expand Up @@ -164,14 +168,14 @@ impl AsyncWrite for ClientWriteHalf {
///
/// Returns false if the daemon process was already running or true if it had
/// to be started
pub(crate) async fn ensure_daemon() -> io::Result<bool> {
pub(crate) async fn ensure_daemon(no_timeout: bool) -> io::Result<bool> {
let mut did_spawn = false;

loop {
match open_socket().await {
Ok(Some(_)) => break,
Ok(None) => {
spawn_daemon()?;
spawn_daemon(no_timeout)?;
did_spawn = true;
time::sleep(Duration::from_millis(50)).await;
}
Expand All @@ -185,7 +189,7 @@ pub(crate) async fn ensure_daemon() -> io::Result<bool> {
/// Ensure the server daemon is running and ready to receive connections and
/// print the global pipe name in the standard output
pub(crate) async fn print_socket() -> io::Result<()> {
ensure_daemon().await?;
ensure_daemon(false).await?;
println!("{}", get_pipe_name());
Ok(())
}
Expand Down
48 changes: 44 additions & 4 deletions crates/rome_lsp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use rome_service::{workspace, Workspace};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::Notify;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use tower_lsp::jsonrpc::Result as LspResult;
use tower_lsp::{lsp_types::*, ClientSocket};
use tower_lsp::{LanguageServer, LspService, Server};
Expand All @@ -24,11 +26,18 @@ pub struct LSPServer {
session: SessionHandle,
/// Map of all sessions connected to the same [ServerFactory] as this [LSPServer].
sessions: Sessions,
/// If this is true the server will broadcast a shutdown signal once the
/// last client disconnected after a short timeout
has_timeout: bool,
}

impl LSPServer {
fn new(session: SessionHandle, sessions: Sessions) -> Self {
Self { session, sessions }
fn new(session: SessionHandle, sessions: Sessions, has_timeout: bool) -> Self {
Self {
session,
sessions,
has_timeout,
}
}

async fn syntax_tree_request(&self, params: SyntaxTreePayload) -> LspResult<String> {
Expand Down Expand Up @@ -329,12 +338,29 @@ impl Drop for LSPServer {
fn drop(&mut self) {
if let Ok(mut sessions) = self.sessions.lock() {
let _removed = sessions.remove(&self.session.key);

debug_assert!(_removed.is_some(), "Session did not exist.");

if self.has_timeout && sessions.is_empty() {
tokio::spawn(server_timeout(
self.session.cancellation.clone(),
self.sessions.clone(),
));
}
}
}
}

/// Broadcast the shutdown signal after one minute if the server has no active session
async fn server_timeout(cancellation: Arc<Notify>, sessions: Sessions) {
sleep(Duration::from_secs(60)).await;

if !sessions.lock().unwrap().is_empty() {
return;
}

cancellation.notify_one();
}

/// Map of active sessions connected to a [ServerFactory].
type Sessions = Arc<Mutex<HashMap<SessionKey, SessionHandle>>>;

Expand All @@ -356,6 +382,10 @@ pub struct ServerFactory {

/// Session key generator. Stores the key of the next session.
next_session_key: AtomicU64,

/// If this is true the server will broadcast a shutdown signal once the
/// last client disconnected after a short timeout
has_timeout: bool,
}

/// Helper method for wrapping a [Workspace] method in a `custom_method` for
Expand Down Expand Up @@ -393,6 +423,16 @@ macro_rules! workspace_method {
}

impl ServerFactory {
pub fn new(has_timeout: bool) -> Self {
Self {
cancellation: Arc::default(),
workspace: None,
sessions: Sessions::default(),
next_session_key: AtomicU64::new(0),
has_timeout,
}
}

/// Create a new [ServerConnection] from this factory
pub fn create(&self) -> ServerConnection {
let workspace = self
Expand All @@ -408,7 +448,7 @@ impl ServerFactory {

let mut sessions = self.sessions.lock().unwrap();
sessions.insert(session_key, handle.clone());
LSPServer::new(handle, self.sessions.clone())
LSPServer::new(handle, self.sessions.clone(), self.has_timeout)
});

builder = builder.custom_method(SYNTAX_TREE_REQUEST, LSPServer::syntax_tree_request);
Expand Down
2 changes: 1 addition & 1 deletion crates/rome_lsp/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) struct Session {
documents: RwLock<HashMap<lsp_types::Url, Document>>,
url_interner: RwLock<UrlInterner>,

cancellation: Arc<Notify>,
pub(crate) cancellation: Arc<Notify>,
}

pub(crate) type SessionHandle = Arc<Session>;
Expand Down

0 comments on commit b6450b0

Please sign in to comment.