Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to Tokio v1.x ecosystem #11475

Merged
merged 6 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
590 changes: 241 additions & 349 deletions src/rust/engine/Cargo.lock

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ default = []
async_latch = { path = "async_latch" }
async_semaphore = { path = "async_semaphore" }
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
concrete_time = { path = "concrete_time" }
cpython = "0.5"
crossbeam-channel = "0.4"
Expand All @@ -122,7 +122,7 @@ parking_lot = "0.11"
process_execution = { path = "process_execution" }
rand = "0.6"
regex = "1"
reqwest = { version = "0.10", default_features = false, features = ["stream", "rustls-tls"] }
reqwest = { version = "0.11", default_features = false, features = ["stream", "rustls-tls"] }
rule_graph = { path = "rule_graph" }
sharded_lmdb = { path = "sharded_lmdb" }
smallvec = "0.6"
Expand All @@ -132,7 +132,8 @@ task_executor = { path = "task_executor" }
tempfile = "3"
testutil_mock = { package = "mock", path = "testutil/mock" }
time = "0.1.40"
tokio = { version = "0.2.23", features = ["macros", "rt-threaded"] }
tokio = { version = "1.4", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.6", features = ["io"] }
tryfuture = { path = "tryfuture" }
ui = { path = "ui" }
url = "2.1"
Expand All @@ -146,10 +147,5 @@ fs = { path = "./fs" }
env_logger = "0.5.4"

[patch.crates-io]
# Patch all transitive uses of Prost to refer to the version with `Bytes` support.
# Specifically, this Prost commit: https://github.com/danburkert/prost/commit/a1cccbcee343e2c444e1cd2738c7fba2599fc391
prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost-build = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost-types = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
# TODO: Posted as https://github.com/mitsuhiko/console/pull/93.
console = { git = "https://github.com/pantsbuild/console", rev = "b6e9aa7ce734517691934d558d79a459609633db" }
4 changes: 2 additions & 2 deletions src/rust/engine/async_latch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
parking_lot = "0.11"
tokio = { version = "0.2.23", features = ["sync"] }
tokio = { version = "1.4", features = ["sync"] }

[dev-dependencies]
tokio = { version = "0.2.23", features = ["macros"] }
tokio = { version = "1.4", features = ["rt", "macros", "time"] }
4 changes: 2 additions & 2 deletions src/rust/engine/async_latch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ impl AsyncLatch {
///
pub async fn triggered(&self) {
// To see whether the latch is triggered, we clone the receiver, and then wait for our clone to
// return None, indicating that the Sender has been dropped.
// return an Err, indicating that the Sender has been dropped.
let mut receiver = self.receiver.clone();
while receiver.recv().await.is_some() {}
while receiver.changed().await.is_ok() {}
}

///
Expand Down
9 changes: 4 additions & 5 deletions src/rust/engine/async_latch/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::AsyncLatch;

use std::time::Duration;

use tokio;
use tokio::time::delay_for;
use tokio::time::sleep;

use crate::AsyncLatch;

#[tokio::test]
async fn basic() {
Expand All @@ -16,7 +15,7 @@ async fn basic() {

// Ensure that `triggered` doesn't return until `trigger` has been called.
tokio::select! {
_ = delay_for(Duration::from_secs(1)) => {},
_ = sleep(Duration::from_secs(1)) => {},
_ = &mut join => { panic!("Background task should have continued to wait.") }
}
latch.trigger();
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/async_semaphore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ publish = false

[dependencies]
parking_lot = "0.11"
tokio = { version = "0.2.23", features = ["sync"] }
tokio = { version = "1.4", features = ["sync"] }

[dev-dependencies]
futures = "0.3"
tokio = { version = "0.2.23", features = ["rt-core", "macros", "time"] }
tokio = { version = "1.4", features = ["rt", "macros", "time"] }
2 changes: 1 addition & 1 deletion src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl AsyncSemaphore {
}

async fn acquire(&self) -> Permit<'_> {
let permit = self.inner.sema.acquire().await;
let permit = self.inner.sema.acquire().await.expect("semaphore closed");
let id = {
let mut available_ids = self.inner.available_ids.lock();
available_ids
Expand Down
38 changes: 18 additions & 20 deletions src/rust/engine/async_semaphore/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::AsyncSemaphore;

use std::time::Duration;

use futures::channel::oneshot;
use futures::future::{self, FutureExt};
use tokio::time::{sleep, timeout};

use tokio;
use tokio::time::{delay_for, timeout};
use crate::AsyncSemaphore;

#[tokio::test]
async fn acquire_and_release() {
Expand All @@ -28,27 +26,27 @@ async fn correct_semaphore_slot_ids() {
//Process 1
tokio::spawn(sema.clone().with_acquired(move |id| async move {
tx1.send(id).unwrap();
delay_for(2 * scale).await;
sleep(2 * scale).await;
future::ready(())
}));
//Process 2
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(1 * scale).await;
sleep(scale).await;
tx2.send(id).unwrap();
future::ready(())
}));
//Process 3
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(1 * scale).await;
sleep(scale).await;
tx3.send(id).unwrap();
future::ready(())
}));

delay_for(5 * scale).await;
sleep(5 * scale).await;

//Process 4
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(1 * scale).await;
sleep(scale).await;
tx4.send(id).unwrap();
future::ready(())
}));
Expand Down Expand Up @@ -82,49 +80,49 @@ async fn correct_semaphore_slot_ids_2() {
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 1");
tx1.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 2");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 2");
tx2.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 3");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 3");
tx3.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 4");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 4");
tx4.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 5");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 5");
tx5.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 6");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 6");
tx6.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 7");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 7");
tx7.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));

Expand Down Expand Up @@ -176,7 +174,7 @@ async fn at_most_n_acquisitions() {

// thread2 should not signal until we unblock thread1.
let acquired_thread2 =
match future::select(delay_for(Duration::from_millis(100)), acquired_thread2).await {
match future::select(sleep(Duration::from_millis(100)).boxed(), acquired_thread2).await {
future::Either::Left((_, acquired_thread2)) => acquired_thread2,
future::Either::Right(_) => {
panic!("thread2 should not have acquired while thread1 was holding.")
Expand Down Expand Up @@ -238,7 +236,7 @@ async fn drop_while_waiting() {
// thread2 will wait for a little while, but then drop its PermitFuture to give up on waiting.
tokio::spawn(async move {
let permit_future = handle2.acquire().boxed();
let delay_future = delay_for(Duration::from_millis(100));
let delay_future = sleep(Duration::from_millis(100)).boxed();
let raced_result = future::select(delay_future, permit_future).await;
// We expect to have timed out, because the other Future will not resolve until asked.
match raced_result {
Expand Down Expand Up @@ -296,15 +294,15 @@ async fn dropped_future_is_removed_from_queue() {
}
let waiter = handle2.with_acquired(|_id| future::ready(()));
let join_handle2 = tokio::spawn(async move {
match future::select(delay_for(Duration::from_millis(100)), waiter.boxed()).await {
match future::select(sleep(Duration::from_millis(100)).boxed(), waiter.boxed()).await {
future::Either::Left(((), waiter_future)) => {
tx_thread2.send(()).unwrap();
rx_thread2.await.unwrap();
drop(waiter_future);
()
}
future::Either::Right(_) => {
panic!("The delay_for result should always be ready first!");
panic!("The sleep result should always be ready first!");
}
}
});
Expand Down
5 changes: 4 additions & 1 deletion src/rust/engine/async_value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ publish = false

[dependencies]
futures = "0.3"
tokio = { version = "0.2.23", features = ["sync"] }
tokio = { version = "1.4", features = ["macros", "sync"] }

[dev-dependencies]
tokio = { version = "1.4", features = ["macros", "rt", "sync", "time"] }
15 changes: 7 additions & 8 deletions src/rust/engine/async_value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ impl<T: Clone + Send + Sync + 'static> AsyncValueReceiver<T> {
pub async fn recv(&self) -> Option<T> {
let mut item_receiver = (*self.item_receiver).clone();
loop {
match item_receiver.recv().await {
Some(None) => {
// Observing the initial value of the channel.
continue;
}
Some(t) => break t,
None => break None,
if let Some(ref value) = *item_receiver.borrow() {
return Some(value.clone());
}

if item_receiver.changed().await.is_err() {
return None;
}
}
}
Expand All @@ -110,7 +109,7 @@ pub struct AsyncValueSender<T: Clone + Send + Sync + 'static> {

impl<T: Clone + Send + Sync + 'static> AsyncValueSender<T> {
pub fn send(self, item: T) {
let _ = self.item_sender.broadcast(Some(item));
let _ = self.item_sender.send(Some(item));
}

pub async fn closed(&mut self) {
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/async_value/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::AsyncValue;
use std::time::Duration;

use tokio;
use tokio::time::delay_for;
use tokio::time::sleep;

#[tokio::test]
async fn send() {
Expand All @@ -21,7 +21,7 @@ async fn cancel_explicit() {

// Ensure that a value is not received.
tokio::select! {
_ = delay_for(Duration::from_secs(1)) => {},
_ = sleep(Duration::from_secs(1)) => {},
_ = receiver.recv() => { panic!("Should have continued to wait.") }
}

Expand All @@ -39,7 +39,7 @@ async fn cancel_implicit() {

// Ensure that a value is not received.
tokio::select! {
_ = delay_for(Duration::from_secs(1)) => {},
_ = sleep(Duration::from_secs(1)) => {},
_ = receiver.recv() => { panic!("Should have continued to wait.") }
}

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/concrete_time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ name = "concrete_time"
publish = false

[dependencies]
prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost-types = "0.6"
prost = "0.7"
prost-types = "0.7"
serde_derive = "1.0.98"
serde = "1.0.98"
log = "0.4"
4 changes: 2 additions & 2 deletions src/rust/engine/fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
dirs-next = "2"
futures = "0.3"
glob = "0.2.11"
Expand All @@ -21,4 +21,4 @@ task_executor = { path = "../task_executor" }
[dev-dependencies]
tempfile = "3"
testutil = { path = "../testutil" }
tokio = { version = "0.2.23", features = ["rt-core", "macros"] }
tokio = { version = "1.4", features = ["rt", "macros"] }
5 changes: 3 additions & 2 deletions src/rust/engine/fs/brfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ protobuf = { version = "2.0.6", features = ["with-bytes"] }
store = { path = "../store" }
task_executor = { path = "../../task_executor" }
time = "0.1.39"
tokio = { version = "0.2.23", features = ["rt-threaded", "macros", "signal", "stream"] }
tokio = { version = "1.4", features = ["rt-multi-thread", "macros", "signal"] }
tokio-stream = { version = "0.1", features = ["signal"] }
workunit_store = { path = "../../workunit_store" }

[dev-dependencies]
bytes = "0.5"
bytes = "1.0"
tempfile = "3"
testutil = { path = "../../testutil" }
10 changes: 6 additions & 4 deletions src/rust/engine/fs/brfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ use log::{debug, error, warn};
use parking_lot::Mutex;
use store::Store;
use tokio::signal::unix::{signal, SignalKind};
use tokio::stream::StreamExt;
use tokio::task;
use tokio_stream::wrappers::SignalStream;
use tokio_stream::StreamExt;

const TTL: time::Timespec = time::Timespec { sec: 0, nsec: 0 };

Expand Down Expand Up @@ -760,9 +761,10 @@ async fn main() {
where
F: Fn() -> SignalKind,
{
signal(install_fn())
.unwrap_or_else(|_| panic!("Failed to install SIG{:?} handler", sig))
.map(move |_| Some(sig))
SignalStream::new(
signal(install_fn()).unwrap_or_else(|_| panic!("Failed to install SIG{:?} handler", sig)),
)
.map(move |_| Some(sig))
}

let sigint = install_handler(SignalKind::interrupt, Sig::INT);
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/fs/fs_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ publish = false

[dependencies]
bazel_protos = { path = "../../process_execution/bazel_protos" }
bytes = "0.5"
bytes = "1.0"
clap = "2"
env_logger = "0.5.4"
grpc_util = { path = "../../grpc_util" }
fs = { path = ".." }
futures = "0.3"
hashing = { path = "../../hashing" }
parking_lot = "0.11"
prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost = "0.7"
rand = "0.6"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
store = { path = "../store" }
task_executor = { path = "../../task_executor" }
tokio = { version = "0.2.23", features = ["rt-threaded", "macros"] }
tokio = { version = "1.4", features = ["rt-multi-thread", "macros"] }
workunit_store = { path = "../../workunit_store" }
Loading