Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zombie burying ritual #1264

Open
wants to merge 4 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions src/language_server_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,6 @@ impl LanguageClient {
self.handle_cursor_moved(&Value::Null, true)?;

self.update_state(|state| {
state.clients.remove(&Some(language_id.into()));
state.last_cursor_line = 0;
state.text_documents.retain(|f, _| !f.starts_with(&root));
state.roots.remove(language_id);
Expand Down Expand Up @@ -2092,7 +2091,7 @@ impl LanguageClient {
Ok(())
}

#[tracing::instrument(level = "info", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub fn text_document_publish_diagnostics(&self, params: &Value) -> Result<()> {
let params = PublishDiagnosticsParams::deserialize(params)?;
if !self.get_config(|c| c.diagnostics_enable)? {
Expand Down Expand Up @@ -3610,7 +3609,7 @@ impl LanguageClient {
Ok(())
})?;

let (child_id, reader, writer): (_, Box<dyn SyncRead>, Box<dyn SyncWrite>) =
let (child, reader, writer): (_, Box<dyn SyncRead>, Box<dyn SyncWrite>) =
if command.get(0).map(|c| c.starts_with("tcp://")) == Some(true) {
let addr = command
.get(0)
Expand Down Expand Up @@ -3642,7 +3641,7 @@ impl LanguageClient {
None => Stdio::null(),
};

let process = std::process::Command::new(
let mut process = std::process::Command::new(
command.get(0).ok_or_else(|| anyhow!("Empty command!"))?,
)
.args(&command[1..])
Expand All @@ -3653,7 +3652,6 @@ impl LanguageClient {
.spawn()
.with_context(|| format!("Failed to start language server ({:?})", command))?;

let child_id = Some(process.id());
let reader = Box::new(BufReader::new(
process
.stdout
Expand All @@ -3664,7 +3662,9 @@ impl LanguageClient {
.stdin
.ok_or_else(|| anyhow!("Failed to get subprocess stdin"))?,
));
(child_id, reader, writer)
process.stdout = None;
process.stdin = None;
(Some(process), reader, writer)
};

let lcn = self.clone();
Expand All @@ -3678,7 +3678,7 @@ impl LanguageClient {
Some(language_id.clone()),
reader,
writer,
child_id,
child,
self.get_state(|state| state.tx.clone())?,
on_server_crash,
)?;
Expand Down Expand Up @@ -3722,10 +3722,38 @@ impl LanguageClient {
if language_id.is_none() {
return Ok(());
}
let lang_id_real = language_id.clone().unwrap();

// avoid any racing of this cleanup with new client creation
let client_update_mutex = self.get_client_update_mutex(language_id.clone())?;
let _per_langid_client_lock = client_update_mutex.lock().map_err(|err|
anyhow!("Failed to lock cleanup of client for languageId {:?}: {:?}",
lang_id_real, err))?;

// must read out the child process' exit code -- lest it'll turn into a zombie!
self.update_state(|state| {
let client_ref = match state.clients.remove(language_id) {
Some(arc) => Ok(arc),
None => Err(anyhow!("Expected to have an RpcClient for {}, found None", lang_id_real)),
}?;
let mut client = Arc::try_unwrap(client_ref)
.map_err(|_| anyhow!("Arc::try_unwrap() unsuccessful return"))
.context("More than 1 reference to RpcClient")?;
if let Some(child) = client.child_process.as_mut() {
match child.try_wait() {
Ok(Some(exitcode)) => info!("Server process exited with {}", exitcode),
Ok(None) => warn!("No exitcode available, this should never happen."),
Err(e) => error!("Process wait failed: {}", e),
}
}
Ok(())
})
.map_err(|err| anyhow!(
"Could not cleanup leftover process for langId {:?}, leaving a zombie. {:?}",
language_id.clone().unwrap(), err))?;

// we don't want to restart if the server was shut down by the user, so check
// VIM_IS_SERVER_RUNNING as that should be true at this point only if the server exited
// unexpectedly.
// next we handle restarting the server -- unless of course it was intentionally
// shut down by the user, in which case VIM_IS_SERVER_RUNNING will be unset.
let filename = self.vim()?.get_filename(&Value::Null)?;
let is_running: u8 = self
.vim()?
Expand Down Expand Up @@ -3757,7 +3785,6 @@ impl LanguageClient {
restarts = 0;
};

state.clients.remove(language_id);
state.restarts.insert(language_id.clone(), restarts);
Ok(())
})?;
Expand All @@ -3772,9 +3799,9 @@ impl LanguageClient {

self.vim()?.echoerr("Server crashed, restarting client")?;
std::thread::sleep(Duration::from_millis(300 * (restarts as u64).pow(2)));
self.start_server(&json!({"languageId": language_id.clone().unwrap()}))?;
self.start_server(&json!({"languageId": lang_id_real}))?;
self.text_document_did_open(&json!({
"languageId": language_id.clone().unwrap(),
"languageId": lang_id_real,
"filename": filename,
}))?;

Expand Down Expand Up @@ -3846,7 +3873,7 @@ impl LanguageClient {
Ok(())
}

#[tracing::instrument(level = "info", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub fn workspace_did_change_watched_files(&self, params: &Value) -> Result<()> {
let filename = self.vim()?.get_filename(params)?;
let language_id = self.vim()?.get_language_id(&filename, params)?;
Expand Down Expand Up @@ -3909,8 +3936,7 @@ impl LanguageClient {
state
.clients
.get(&Some(language_id.clone()))
.map(|c| c.process_id)
.unwrap_or_default(),
.and_then(|c| c.child_process.as_ref().map(|p| p.id()))
);
msg += &format!("Language server stderr: {}\n", server_stderr,);
msg += &format!("Log level: {}\n", state.logger.level);
Expand Down
8 changes: 5 additions & 3 deletions src/rpcclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::str::FromStr;
use std::{
collections::HashMap,
io::BufRead,
process::Child,
sync::atomic::{AtomicU64, Ordering},
thread,
time::Duration,
Expand All @@ -32,7 +33,8 @@ pub struct RpcClient {
writer_tx: Sender<RawMessage>,
#[serde(skip_serializing)]
reader_tx: Sender<(Id, Sender<jsonrpc_core::Output>)>,
pub process_id: Option<u32>,
#[serde(skip_serializing)] // FIXME
pub child_process: Option<Child>,
}

impl RpcClient {
Expand All @@ -41,7 +43,7 @@ impl RpcClient {
language_id: LanguageId,
reader: impl BufRead + Send + 'static,
writer: impl Write + Send + 'static,
process_id: Option<u32>,
child_process: Option<Child>,
sink: Sender<Call>,
on_crash: impl Fn(&LanguageId) + Clone + Send + 'static,
) -> Result<Self> {
Expand Down Expand Up @@ -86,7 +88,7 @@ impl RpcClient {
Ok(Self {
language_id,
id: AtomicU64::default(),
process_id,
child_process,
reader_tx,
writer_tx,
})
Expand Down