Skip to content
This repository has been archived by the owner on Dec 29, 2022. It is now read-only.

Use Tokio 0.2 #1713

Merged
merged 1 commit into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
637 changes: 220 additions & 417 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ cargo = { git = "https://github.com/rust-lang/cargo", rev = "329895f5b52a358e5d9
cargo_metadata = "0.8"
clippy_lints = { git = "https://github.com/rust-lang/rust-clippy", rev = "7ea7cd165ad6705603852771bf82cc2fd6560db5", optional = true }
env_logger = "0.7"
futures = { version = "0.1", optional = true }
home = "0.5.1"
itertools = "0.8"
jsonrpc-core = "14"
jsonrpc-core = "17"
lsp-types = { version = "0.60", features = ["proposed"] }
lazy_static = "1"
log = "0.4"
Expand All @@ -51,7 +50,6 @@ serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
serde_ignored = "0.1"
tokio = { version = "0.1", optional = true }
url = "2"
walkdir = "2"
regex = "1"
Expand All @@ -68,16 +66,15 @@ rustc-workspace-hack = "1.0.0"
[dev-dependencies]
difference = "2"
tempfile = "3"
lsp-codec = "0.1.2"
tokio = "0.1"
futures = "0.1"
tokio-process = "0.2"
tokio-timer = "0.2"
lsp-codec = "0.2"
tokio = { version = "0.2", default-features = false, features = ["rt-core", "time", "io-util", "process", "rt-util"] }
tokio-util = { version = "0.3", default-features = false, features = ["codec"] }
futures = "0.3"

[build-dependencies]
rustc_tools_util = "0.2"

[features]
clippy = ["clippy_lints", "rls-rustc/clippy"]
ipc = ["tokio", "futures", "rls-rustc/ipc", "rls-ipc/server"]
ipc = ["rls-rustc/ipc", "rls-ipc/server"]
default = ["ipc"]
9 changes: 4 additions & 5 deletions rls-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ repository = "https://github.com/rust-lang/rls"
categories = ["development-tools"]

[dependencies]
jsonrpc-core = "14"
jsonrpc-core-client = "14"
jsonrpc-derive = "14"
# Pin 14.0.3 to use single parity-tokio-ipc version (0.2)
jsonrpc-ipc-server = { version = "=14.0.3", optional = true }
jsonrpc-core = "17"
jsonrpc-core-client = "17"
jsonrpc-derive = "17"
jsonrpc-ipc-server = { version = "17", optional = true }
rls-data = "0.19"
serde = { version = "1.0", features = ["derive"] }

Expand Down
4 changes: 2 additions & 2 deletions rls-rustc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ env_logger = "0.7"
log = "0.4"
rand = "0.7"
clippy_lints = { git = "https://github.com/rust-lang/rust-clippy", rev = "d236b30a1d638340aad8345fa2946cfe9543dcf0", optional = true }
tokio = { version = "0.1", optional = true }
futures = { version = "0.1", optional = true }
tokio = { version = "0.2", optional = true }
futures = { version = "0.3", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
rls-data = { version = "0.19", optional = true }
rls-ipc = { path = "../rls-ipc", optional = true }
Expand Down
13 changes: 5 additions & 8 deletions rls-rustc/src/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::io;
use std::path::{Path, PathBuf};

use futures::Future;

use rls_ipc::client::{Client as JointClient, RpcChannel, RpcError};
use rls_ipc::rpc::callbacks::Client as CallbacksClient;
use rls_ipc::rpc::file_loader::Client as FileLoaderClient;
Expand All @@ -30,13 +29,11 @@ impl IpcFileLoader {

impl rustc_span::source_map::FileLoader for IpcFileLoader {
fn file_exists(&self, path: &Path) -> bool {
self.0.file_exists(path.to_owned()).wait().unwrap()
futures::executor::block_on(self.0.file_exists(path.to_owned())).unwrap()
}

fn read_file(&self, path: &Path) -> io::Result<String> {
self.0
.read_file(path.to_owned())
.wait()
futures::executor::block_on(self.0.read_file(path.to_owned()))
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))
}
}
Expand All @@ -48,14 +45,14 @@ impl IpcCallbacks {
pub fn complete_analysis(
&self,
analysis: rls_data::Analysis,
) -> impl Future<Item = (), Error = RpcError> {
) -> impl Future<Output = Result<(), RpcError>> {
self.0.complete_analysis(analysis)
}

pub fn input_files(
&self,
input_files: HashMap<PathBuf, HashSet<rls_ipc::rpc::Crate>>,
) -> impl Future<Item = (), Error = RpcError> {
) -> impl Future<Output = Result<(), RpcError>> {
self.0.input_files(input_files)
}
}
Expand Down
18 changes: 7 additions & 11 deletions rls-rustc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ pub fn run() -> Result<(), ()> {
#[cfg(feature = "ipc")]
let (mut shim_calls, file_loader) = match std::env::var("RLS_IPC_ENDPOINT").ok() {
Some(endpoint) => {
#[allow(deprecated)] // Windows doesn't work with lazily-bound reactors
let reactor = rt.reactor().clone();
let connection =
ipc::connect(endpoint, &reactor).expect("Couldn't connect to IPC endpoint");
let client: ipc::Client =
rt.block_on(connection).expect("Couldn't connect to IPC endpoint");
let client: ipc::Client = rt
.block_on(async { ipc::connect(endpoint).await })
.expect("Couldn't connect to IPC endpoint");
let (file_loader, callbacks) = client.split();

(
Expand Down Expand Up @@ -113,7 +110,6 @@ impl Callbacks for ShimCalls {
) -> Compilation {
use rustc_session::config::Input;

use futures::future::Future;
use rls_ipc::rpc::{Crate, Edition};
use std::collections::{HashMap, HashSet};

Expand Down Expand Up @@ -149,7 +145,7 @@ impl Callbacks for ShimCalls {
input_files.entry(file).or_default().insert(krate.clone());
}

if let Err(e) = callbacks.input_files(input_files).wait() {
if let Err(e) = futures::executor::block_on(callbacks.input_files(input_files)) {
log::error!("Can't send input files as part of a compilation callback: {:?}", e);
}

Expand All @@ -162,8 +158,6 @@ impl Callbacks for ShimCalls {
compiler: &interface::Compiler,
queries: &'tcx Queries<'tcx>,
) -> Compilation {
use futures::future::Future;

let callbacks = match self.callbacks.as_ref() {
Some(callbacks) => callbacks,
None => return Compilation::Continue,
Expand Down Expand Up @@ -196,7 +190,9 @@ impl Callbacks for ShimCalls {
CallbackHandler {
callback: &mut |a| {
let analysis = unsafe { ::std::mem::transmute(a.clone()) };
if let Err(e) = callbacks.complete_analysis(analysis).wait() {
if let Err(e) =
futures::executor::block_on(callbacks.complete_analysis(analysis))
{
log::error!(
"Can't send analysis as part of a compilation callback: {:?}",
e
Expand Down
6 changes: 0 additions & 6 deletions rls/src/build/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ pub fn start_with_handler(io: IoHandler) -> Result<Server, ()> {
let endpoint_path = endpoint_path.clone();
move || {
log::trace!("Attempting to spin up IPC server at {}", endpoint_path);
let runtime = tokio::runtime::Builder::new().core_threads(1).build().unwrap();
#[allow(deprecated)] // Windows won't work with lazily bound reactor
let (reactor, executor) = (runtime.reactor(), runtime.executor());

let server = ServerBuilder::new(io)
.event_loop_executor(executor)
.event_loop_reactor(reactor.clone())
.start(&endpoint_path)
.map_err(|_| log::warn!("Couldn't open socket"))
.unwrap();
Expand Down
21 changes: 13 additions & 8 deletions tests/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::fs;
use std::path::Path;
use std::time::{Duration, Instant};
use std::time::Duration;

use futures::future::Future;
use futures::future;
use lsp_types::{notification::*, request::*, *};
use serde::de::Deserialize;
use serde_json::json;
Expand Down Expand Up @@ -245,7 +245,8 @@ fn client_changing_workspace_lib_retains_diagnostics() {

let lib = rls.future_diagnostics("library/src/lib.rs");
let bin = rls.future_diagnostics("binary/src/main.rs");
let (lib, bin) = rls.block_on(lib.join(bin)).unwrap();
let (lib, bin) = rls.block_on(future::join(lib, bin)).unwrap();
let (lib, bin) = (lib.unwrap(), bin.unwrap());

assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `test_val`")));
assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `unused`")));
Expand All @@ -268,7 +269,8 @@ fn client_changing_workspace_lib_retains_diagnostics() {

let lib = rls.future_diagnostics("library/src/lib.rs");
let bin = rls.future_diagnostics("binary/src/main.rs");
let (lib, bin) = rls.block_on(lib.join(bin)).unwrap();
let (lib, bin) = rls.block_on(future::join(lib, bin)).unwrap();
let (lib, bin) = (lib.unwrap(), bin.unwrap());

// lib unit tests have compile errors
assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `unused`")));
Expand All @@ -293,7 +295,8 @@ fn client_changing_workspace_lib_retains_diagnostics() {

let lib = rls.future_diagnostics("library/src/lib.rs");
let bin = rls.future_diagnostics("binary/src/main.rs");
let (lib, bin) = rls.block_on(lib.join(bin)).unwrap();
let (lib, bin) = rls.block_on(future::join(lib, bin)).unwrap();
let (lib, bin) = (lib.unwrap(), bin.unwrap());

assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `test_val`")));
assert!(lib.diagnostics.iter().any(|m| m.message.contains("unused variable: `unused`")));
Expand Down Expand Up @@ -349,6 +352,7 @@ fn client_implicit_workspace_pick_up_lib_changes() {

let bin = rls.future_diagnostics("src/main.rs");
let bin = rls.block_on(bin).unwrap();
let bin = bin.unwrap();
assert!(bin.diagnostics[0].message.contains("unused variable: `val`"));

rls.notify::<DidChangeTextDocument>(DidChangeTextDocumentParams {
Expand All @@ -369,6 +373,7 @@ fn client_implicit_workspace_pick_up_lib_changes() {
// bin depending on lib picks up type mismatch
let bin = rls.future_diagnostics("src/main.rs");
let bin = rls.block_on(bin).unwrap();
let bin = bin.unwrap();
assert!(bin.diagnostics[0].message.contains("cannot find function `foo`"));

rls.notify::<DidChangeTextDocument>(DidChangeTextDocumentParams {
Expand All @@ -388,6 +393,7 @@ fn client_implicit_workspace_pick_up_lib_changes() {

let bin = rls.future_diagnostics("src/main.rs");
let bin = rls.block_on(bin).unwrap();
let bin = bin.unwrap();
assert!(bin.diagnostics[0].message.contains("unused variable: `val`"));
}

Expand Down Expand Up @@ -1971,7 +1977,7 @@ fn client_omit_init_build() {
// We need to assert that no other messages are received after a short
// period of time (e.g. no build progress messages).
std::thread::sleep(std::time::Duration::from_secs(1));
rls.block_on(response).unwrap();
rls.block_on(response).unwrap().unwrap();

assert_eq!(rls.messages().iter().count(), 1);
}
Expand Down Expand Up @@ -2141,8 +2147,7 @@ fn client_fail_uninitialized_request() {
},
);

let delay = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(1));
rls.block_on(delay).unwrap();
rls.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }).unwrap();

let err = jsonrpc_core::Failure::deserialize(rls.messages().last().unwrap()).unwrap();
assert_eq!(err.id, jsonrpc_core::Id::Num(ID));
Expand Down
52 changes: 29 additions & 23 deletions tests/support/client/child_process.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,61 @@
use std::io::{Read, Write};
use std::io;
use std::pin::Pin;
use std::process::{Command, Stdio};
use std::rc::Rc;
use std::task::{Context, Poll};

use futures::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_process::{Child, CommandExt};

pub struct ChildProcess {
stdin: tokio_process::ChildStdin,
stdout: tokio_process::ChildStdout,
child: Rc<tokio_process::Child>,
stdin: tokio::process::ChildStdin,
stdout: tokio::process::ChildStdout,
child: Rc<tokio::process::Child>,
}

impl Read for ChildProcess {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
Read::read(&mut self.stdout, buf)
impl AsyncRead for ChildProcess {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stdout).poll_read(cx, buf)
}
}

impl Write for ChildProcess {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Write::write(&mut self.stdin, buf)
impl AsyncWrite for ChildProcess {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stdin).poll_write(cx, buf)
}
fn flush(&mut self) -> std::io::Result<()> {
Write::flush(&mut self.stdin)

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.stdin).poll_flush(cx)
}
}

impl AsyncRead for ChildProcess {}
impl AsyncWrite for ChildProcess {
fn shutdown(&mut self) -> Poll<(), std::io::Error> {
AsyncWrite::shutdown(&mut self.stdin)
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.stdin).poll_shutdown(cx)
}
}

impl ChildProcess {
pub fn spawn_from_command(mut cmd: Command) -> Result<ChildProcess, std::io::Error> {
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
let mut child = cmd.spawn_async()?;
let mut child = tokio::process::Command::from(cmd).spawn().expect("to async spawn process");

Ok(ChildProcess {
stdout: child.stdout().take().unwrap(),
stdin: child.stdin().take().unwrap(),
stdout: child.stdout.take().unwrap(),
stdin: child.stdin.take().unwrap(),
child: Rc::new(child),
})
}

/// Returns a handle to the underlying `Child` process.
/// Useful when waiting until child process exits.
pub fn child(&self) -> Rc<Child> {
pub fn child(&self) -> Rc<tokio::process::Child> {
Rc::clone(&self.child)
}
}
Loading