Skip to content
This repository has been archived by the owner on Aug 31, 2023. It is now read-only.

Commit

Permalink
stop the server immediately if at least one session was initialized
Browse files Browse the repository at this point in the history
  • Loading branch information
leops committed Nov 10, 2022
1 parent b6450b0 commit 3d5820e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 44 deletions.
8 changes: 4 additions & 4 deletions crates/rome_cli/src/commands/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{

pub(crate) fn start(mut session: CliSession) -> Result<(), Termination> {
let rt = Runtime::new()?;
let did_spawn = rt.block_on(ensure_daemon(true))?;
let did_spawn = rt.block_on(ensure_daemon(false))?;

if did_spawn {
session.app.console.log(markup! {
Expand Down Expand Up @@ -63,10 +63,10 @@ pub(crate) fn stop(mut session: CliSession) -> Result<(), Termination> {
pub(crate) fn run_server(mut session: CliSession) -> Result<(), Termination> {
setup_tracing_subscriber();

let no_timeout = session.args.contains("--no-timeout");
let is_oneshot = session.args.contains("--oneshot");

let rt = Runtime::new()?;
let factory = ServerFactory::new(!no_timeout);
let factory = ServerFactory::new(is_oneshot);
let cancellation = factory.cancellation();
let span = debug_span!("Running Server", pid = std::process::id());

Expand Down Expand Up @@ -103,7 +103,7 @@ pub(crate) fn lsp_proxy() -> Result<(), Termination> {
/// Receives a process via `stdin` and then copy the content to the LSP socket.
/// Copy to the process on `stdout` when the LSP responds to a message
async fn start_lsp_proxy(rt: &Runtime) -> Result<(), Termination> {
ensure_daemon(false).await?;
ensure_daemon(true).await?;

match open_socket().await? {
Some((mut owned_read_half, mut owned_write_half)) => {
Expand Down
12 changes: 6 additions & 6 deletions crates/rome_cli/src/service/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ async fn try_connect() -> io::Result<UnixStream> {
}

/// Spawn the daemon server process in the background
fn spawn_daemon(no_timeout: bool) -> io::Result<Child> {
fn spawn_daemon(is_oneshot: bool) -> io::Result<Child> {
let binary = env::current_exe()?;

let mut cmd = Command::new(binary);
cmd.arg("__run_server");

if no_timeout {
cmd.arg("--no-timeout");
if is_oneshot {
cmd.arg("--oneshot");
}

// Create a new session for the process and make it the leader, this will
Expand Down Expand Up @@ -106,7 +106,7 @@ pub(crate) async fn open_socket() -> io::Result<Option<(OwnedReadHalf, OwnedWrit
///
/// Returns false if the daemon process was already running or true if it had
/// to be started
pub(crate) async fn ensure_daemon(no_timeout: bool) -> io::Result<bool> {
pub(crate) async fn ensure_daemon(is_oneshot: bool) -> io::Result<bool> {
let mut current_child: Option<Child> = None;
let mut last_error = None;

Expand Down Expand Up @@ -144,7 +144,7 @@ pub(crate) async fn ensure_daemon(no_timeout: bool) -> io::Result<bool> {
} else {
// Spawn the daemon process and wait a few milliseconds for
// it to become ready then retry the connection
current_child = Some(spawn_daemon(no_timeout)?);
current_child = Some(spawn_daemon(is_oneshot)?);
time::sleep(Duration::from_millis(50)).await;
}
}
Expand All @@ -166,7 +166,7 @@ pub(crate) async fn ensure_daemon(no_timeout: bool) -> io::Result<bool> {
/// Ensure the server daemon is running and ready to receive connections and
/// print the global socket name in the standard output
pub(crate) async fn print_socket() -> io::Result<()> {
ensure_daemon(false).await?;
ensure_daemon(true).await?;
println!("{}", get_socket_name().display());
Ok(())
}
Expand Down
12 changes: 6 additions & 6 deletions crates/rome_cli/src/service/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ async fn try_connect() -> io::Result<NamedPipeClient> {
const CREATE_NEW_PROCESS_GROUP: u32 = 0x00000200;

/// Spawn the daemon server process in the background
fn spawn_daemon(no_timeout: bool) -> io::Result<()> {
fn spawn_daemon(is_oneshot: bool) -> io::Result<()> {
let binary = env::current_exe()?;

let mut cmd = Command::new(binary);
cmd.arg("__run_server");

if no_timeout {
cmd.arg("--no-timeout");
if is_oneshot {
cmd.arg("--oneshot");
}

cmd.creation_flags(CREATE_NEW_PROCESS_GROUP);
Expand Down Expand Up @@ -168,14 +168,14 @@ impl AsyncWrite for ClientWriteHalf {
///
/// Returns false if the daemon process was already running or true if it had
/// to be started
pub(crate) async fn ensure_daemon(no_timeout: bool) -> io::Result<bool> {
pub(crate) async fn ensure_daemon(is_oneshot: bool) -> io::Result<bool> {
let mut did_spawn = false;

loop {
match open_socket().await {
Ok(Some(_)) => break,
Ok(None) => {
spawn_daemon(no_timeout)?;
spawn_daemon(is_oneshot)?;
did_spawn = true;
time::sleep(Duration::from_millis(50)).await;
}
Expand All @@ -189,7 +189,7 @@ pub(crate) async fn ensure_daemon(no_timeout: bool) -> io::Result<bool> {
/// Ensure the server daemon is running and ready to receive connections and
/// print the global pipe name in the standard output
pub(crate) async fn print_socket() -> io::Result<()> {
ensure_daemon(false).await?;
ensure_daemon(true).await?;
println!("{}", get_pipe_name());
Ok(())
}
Expand Down
61 changes: 33 additions & 28 deletions crates/rome_lsp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ use rome_fs::CONFIG_NAME;
use rome_service::workspace::{RageEntry, RageParams, RageResult};
use rome_service::{workspace, Workspace};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::Notify;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use tower_lsp::jsonrpc::Result as LspResult;
use tower_lsp::{lsp_types::*, ClientSocket};
use tower_lsp::{LanguageServer, LspService, Server};
Expand All @@ -27,16 +25,25 @@ pub struct LSPServer {
/// Map of all sessions connected to the same [ServerFactory] as this [LSPServer].
sessions: Sessions,
/// If this is true the server will broadcast a shutdown signal once the
/// last client disconnected after a short timeout
has_timeout: bool,
/// last client disconnected
is_oneshot: bool,
/// This shared flag is set to true once at least one sessions has been
/// initialized on this server instance
is_initialized: Arc<AtomicBool>,
}

impl LSPServer {
fn new(session: SessionHandle, sessions: Sessions, has_timeout: bool) -> Self {
fn new(
session: SessionHandle,
sessions: Sessions,
is_oneshot: bool,
is_initialized: Arc<AtomicBool>,
) -> Self {
Self {
session,
sessions,
has_timeout,
is_oneshot,
is_initialized,
}
}

Expand Down Expand Up @@ -181,6 +188,7 @@ impl LanguageServer for LSPServer {
#[tracing::instrument(level = "trace", skip(self))]
async fn initialize(&self, params: InitializeParams) -> LspResult<InitializeResult> {
info!("Starting Rome Language Server...");
self.is_initialized.store(true, Ordering::Relaxed);

self.session
.client_capabilities
Expand Down Expand Up @@ -340,27 +348,14 @@ impl Drop for LSPServer {
let _removed = sessions.remove(&self.session.key);
debug_assert!(_removed.is_some(), "Session did not exist.");

if self.has_timeout && sessions.is_empty() {
tokio::spawn(server_timeout(
self.session.cancellation.clone(),
self.sessions.clone(),
));
if self.is_oneshot && sessions.is_empty() && self.is_initialized.load(Ordering::Relaxed)
{
self.session.cancellation.notify_one();
}
}
}
}

/// Broadcast the shutdown signal after one minute if the server has no active session
async fn server_timeout(cancellation: Arc<Notify>, sessions: Sessions) {
sleep(Duration::from_secs(60)).await;

if !sessions.lock().unwrap().is_empty() {
return;
}

cancellation.notify_one();
}

/// Map of active sessions connected to a [ServerFactory].
type Sessions = Arc<Mutex<HashMap<SessionKey, SessionHandle>>>;

Expand All @@ -384,8 +379,11 @@ pub struct ServerFactory {
next_session_key: AtomicU64,

/// If this is true the server will broadcast a shutdown signal once the
/// last client disconnected after a short timeout
has_timeout: bool,
/// last client disconnected
is_oneshot: bool,
/// This shared flag is set to true once at least one sessions has been
/// initialized on this server instance
is_initialized: Arc<AtomicBool>,
}

/// Helper method for wrapping a [Workspace] method in a `custom_method` for
Expand Down Expand Up @@ -423,13 +421,14 @@ macro_rules! workspace_method {
}

impl ServerFactory {
pub fn new(has_timeout: bool) -> Self {
pub fn new(is_oneshot: bool) -> Self {
Self {
cancellation: Arc::default(),
workspace: None,
sessions: Sessions::default(),
next_session_key: AtomicU64::new(0),
has_timeout,
is_oneshot,
is_initialized: Arc::default(),
}
}

Expand All @@ -448,7 +447,13 @@ impl ServerFactory {

let mut sessions = self.sessions.lock().unwrap();
sessions.insert(session_key, handle.clone());
LSPServer::new(handle, self.sessions.clone(), self.has_timeout)

LSPServer::new(
handle,
self.sessions.clone(),
self.is_oneshot,
self.is_initialized.clone(),
)
});

builder = builder.custom_method(SYNTAX_TREE_REQUEST, LSPServer::syntax_tree_request);
Expand Down

0 comments on commit 3d5820e

Please sign in to comment.