Skip to content

Commit

Permalink
tokio 1.x upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Dyas committed Mar 20, 2021
1 parent 60916e1 commit 0a4b387
Show file tree
Hide file tree
Showing 46 changed files with 520 additions and 557 deletions.
630 changes: 280 additions & 350 deletions src/rust/engine/Cargo.lock

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ default = []
async_latch = { path = "async_latch" }
async_semaphore = { path = "async_semaphore" }
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
concrete_time = { path = "concrete_time" }
cpython = "0.5"
crossbeam-channel = "0.4"
Expand All @@ -122,7 +122,7 @@ parking_lot = "0.11"
process_execution = { path = "process_execution" }
rand = "0.6"
regex = "1"
reqwest = { version = "0.10", default_features = false, features = ["stream", "rustls-tls"] }
reqwest = { version = "0.11", default_features = false, features = ["stream", "rustls-tls"] }
rule_graph = { path = "rule_graph" }
sharded_lmdb = { path = "sharded_lmdb" }
smallvec = "0.6"
Expand All @@ -132,7 +132,8 @@ task_executor = { path = "task_executor" }
tempfile = "3"
testutil_mock = { package = "mock", path = "testutil/mock" }
time = "0.1.40"
tokio = { version = "0.2.23", features = ["macros", "rt-threaded"] }
tokio = { version = "^1.4", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.6", features = ["io"] }
tryfuture = { path = "tryfuture" }
ui = { path = "ui" }
url = "2.1"
Expand All @@ -146,10 +147,5 @@ fs = { path = "./fs" }
env_logger = "0.5.4"

[patch.crates-io]
# Patch all transitive uses of Prost to refer to the version with `Bytes` support.
# Specifically, this Prost commit: https://github.com/danburkert/prost/commit/a1cccbcee343e2c444e1cd2738c7fba2599fc391
prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost-build = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost-types = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
# TODO: Posted as https://github.com/mitsuhiko/console/pull/93.
console = { git = "https://github.com/pantsbuild/console", rev = "b6e9aa7ce734517691934d558d79a459609633db" }
4 changes: 2 additions & 2 deletions src/rust/engine/async_latch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
parking_lot = "0.11"
tokio = { version = "0.2.23", features = ["sync"] }
tokio = { version = "^1.4", features = ["sync"] }

[dev-dependencies]
tokio = { version = "0.2.23", features = ["macros"] }
tokio = { version = "^1.4", features = ["rt", "macros", "time"] }
2 changes: 1 addition & 1 deletion src/rust/engine/async_latch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AsyncLatch {
// To see whether the latch is triggered, we clone the receiver, and then wait for our clone to
// return None, indicating that the Sender has been dropped.
let mut receiver = self.receiver.clone();
while receiver.recv().await.is_some() {}
while receiver.changed().await.is_ok() {}
}

///
Expand Down
9 changes: 4 additions & 5 deletions src/rust/engine/async_latch/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::AsyncLatch;

use std::time::Duration;

use tokio;
use tokio::time::delay_for;
use tokio::time::sleep;

use crate::AsyncLatch;

#[tokio::test]
async fn basic() {
Expand All @@ -16,7 +15,7 @@ async fn basic() {

// Ensure that `triggered` doesn't return until `trigger` has been called.
tokio::select! {
_ = delay_for(Duration::from_secs(1)) => {},
_ = sleep(Duration::from_secs(1)) => {},
_ = &mut join => { panic!("Background task should have continued to wait.") }
}
latch.trigger();
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/async_semaphore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ publish = false

[dependencies]
parking_lot = "0.11"
tokio = { version = "0.2.23", features = ["sync"] }
tokio = { version = "^1.4", features = ["sync"] }

[dev-dependencies]
futures = "0.3"
tokio = { version = "0.2.23", features = ["rt-core", "macros", "time"] }
tokio = { version = "^1.4", features = ["rt", "macros", "time"] }
2 changes: 1 addition & 1 deletion src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl AsyncSemaphore {
}

async fn acquire(&self) -> Permit<'_> {
let permit = self.inner.sema.acquire().await;
let permit = self.inner.sema.acquire().await.expect("semaphore closed");
let id = {
let mut available_ids = self.inner.available_ids.lock();
available_ids
Expand Down
38 changes: 18 additions & 20 deletions src/rust/engine/async_semaphore/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::AsyncSemaphore;

use std::time::Duration;

use futures::channel::oneshot;
use futures::future::{self, FutureExt};
use tokio::time::{sleep, timeout};

use tokio;
use tokio::time::{delay_for, timeout};
use crate::AsyncSemaphore;

#[tokio::test]
async fn acquire_and_release() {
Expand All @@ -28,27 +26,27 @@ async fn correct_semaphore_slot_ids() {
//Process 1
tokio::spawn(sema.clone().with_acquired(move |id| async move {
tx1.send(id).unwrap();
delay_for(2 * scale).await;
sleep(2 * scale).await;
future::ready(())
}));
//Process 2
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(1 * scale).await;
sleep(scale).await;
tx2.send(id).unwrap();
future::ready(())
}));
//Process 3
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(1 * scale).await;
sleep(scale).await;
tx3.send(id).unwrap();
future::ready(())
}));

delay_for(5 * scale).await;
sleep(5 * scale).await;

//Process 4
tokio::spawn(sema.clone().with_acquired(move |id| async move {
delay_for(1 * scale).await;
sleep(scale).await;
tx4.send(id).unwrap();
future::ready(())
}));
Expand Down Expand Up @@ -82,49 +80,49 @@ async fn correct_semaphore_slot_ids_2() {
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 1");
tx1.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 2");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 2");
tx2.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 3");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 3");
tx3.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 4");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 4");
tx4.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 5");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 5");
tx5.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 6");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 6");
tx6.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));
println!("Spawning process 7");
tokio::spawn(sema.clone().with_acquired(move |id| async move {
println!("Exec process 7");
tx7.send(id).unwrap();
delay_for(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
future::ready(())
}));

Expand Down Expand Up @@ -176,7 +174,7 @@ async fn at_most_n_acquisitions() {

// thread2 should not signal until we unblock thread1.
let acquired_thread2 =
match future::select(delay_for(Duration::from_millis(100)), acquired_thread2).await {
match future::select(sleep(Duration::from_millis(100)).boxed(), acquired_thread2).await {
future::Either::Left((_, acquired_thread2)) => acquired_thread2,
future::Either::Right(_) => {
panic!("thread2 should not have acquired while thread1 was holding.")
Expand Down Expand Up @@ -238,7 +236,7 @@ async fn drop_while_waiting() {
// thread2 will wait for a little while, but then drop its PermitFuture to give up on waiting.
tokio::spawn(async move {
let permit_future = handle2.acquire().boxed();
let delay_future = delay_for(Duration::from_millis(100));
let delay_future = sleep(Duration::from_millis(100)).boxed();
let raced_result = future::select(delay_future, permit_future).await;
// We expect to have timed out, because the other Future will not resolve until asked.
match raced_result {
Expand Down Expand Up @@ -296,15 +294,15 @@ async fn dropped_future_is_removed_from_queue() {
}
let waiter = handle2.with_acquired(|_id| future::ready(()));
let join_handle2 = tokio::spawn(async move {
match future::select(delay_for(Duration::from_millis(100)), waiter.boxed()).await {
match future::select(sleep(Duration::from_millis(100)).boxed(), waiter.boxed()).await {
future::Either::Left(((), waiter_future)) => {
tx_thread2.send(()).unwrap();
rx_thread2.await.unwrap();
drop(waiter_future);
()
}
future::Either::Right(_) => {
panic!("The delay_for result should always be ready first!");
panic!("The sleep result should always be ready first!");
}
}
});
Expand Down
5 changes: 4 additions & 1 deletion src/rust/engine/async_value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ publish = false

[dependencies]
futures = "0.3"
tokio = { version = "0.2.23", features = ["sync"] }
tokio = { version = "^1.4", features = ["macros", "sync"] }

[dev-dependencies]
tokio = { version = "^1.4", features = ["macros", "rt", "sync", "time"] }
15 changes: 7 additions & 8 deletions src/rust/engine/async_value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ impl<T: Clone + Send + Sync + 'static> AsyncValueReceiver<T> {
pub async fn recv(&self) -> Option<T> {
let mut item_receiver = (*self.item_receiver).clone();
loop {
match item_receiver.recv().await {
Some(None) => {
// Observing the initial value of the channel.
continue;
}
Some(t) => break t,
None => break None,
if let Some(ref value) = *item_receiver.borrow() {
return Some(value.clone());
}

if item_receiver.changed().await.is_err() {
return None;
}
}
}
Expand All @@ -110,7 +109,7 @@ pub struct AsyncValueSender<T: Clone + Send + Sync + 'static> {

impl<T: Clone + Send + Sync + 'static> AsyncValueSender<T> {
pub fn send(self, item: T) {
let _ = self.item_sender.broadcast(Some(item));
let _ = self.item_sender.send(Some(item));
}

pub async fn closed(&mut self) {
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/async_value/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::AsyncValue;
use std::time::Duration;

use tokio;
use tokio::time::delay_for;
use tokio::time::sleep;

#[tokio::test]
async fn send() {
Expand All @@ -21,7 +21,7 @@ async fn cancel_explicit() {

// Ensure that a value is not received.
tokio::select! {
_ = delay_for(Duration::from_secs(1)) => {},
_ = sleep(Duration::from_secs(1)) => {},
_ = receiver.recv() => { panic!("Should have continued to wait.") }
}

Expand All @@ -39,7 +39,7 @@ async fn cancel_implicit() {

// Ensure that a value is not received.
tokio::select! {
_ = delay_for(Duration::from_secs(1)) => {},
_ = sleep(Duration::from_secs(1)) => {},
_ = receiver.recv() => { panic!("Should have continued to wait.") }
}

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/concrete_time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ name = "concrete_time"
publish = false

[dependencies]
prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost-types = "0.6"
prost = "0.7"
prost-types = "0.7"
serde_derive = "1.0.98"
serde = "1.0.98"
log = "0.4"
4 changes: 2 additions & 2 deletions src/rust/engine/fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
dirs-next = "2"
futures = "0.3"
glob = "0.2.11"
Expand All @@ -21,4 +21,4 @@ task_executor = { path = "../task_executor" }
[dev-dependencies]
tempfile = "3"
testutil = { path = "../testutil" }
tokio = { version = "0.2.23", features = ["rt-core", "macros"] }
tokio = { version = "^1.4", features = ["rt", "macros"] }
4 changes: 2 additions & 2 deletions src/rust/engine/fs/brfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ protobuf = { version = "2.0.6", features = ["with-bytes"] }
store = { path = "../store" }
task_executor = { path = "../../task_executor" }
time = "0.1.39"
tokio = { version = "0.2.23", features = ["rt-threaded", "macros", "signal", "stream"] }
tokio = { version = "^1.4", features = ["rt-multi-thread", "macros", "signal"] }
workunit_store = { path = "../../workunit_store" }

[dev-dependencies]
bytes = "0.5"
bytes = "1.0"
tempfile = "3"
testutil = { path = "../../testutil" }
6 changes: 3 additions & 3 deletions src/rust/engine/fs/fs_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ publish = false

[dependencies]
bazel_protos = { path = "../../process_execution/bazel_protos" }
bytes = "0.5"
bytes = "1.0"
clap = "2"
env_logger = "0.5.4"
grpc_util = { path = "../../grpc_util" }
fs = { path = ".." }
futures = "0.3"
hashing = { path = "../../hashing" }
parking_lot = "0.11"
prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" }
prost = "0.7"
rand = "0.6"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
store = { path = "../store" }
task_executor = { path = "../../task_executor" }
tokio = { version = "0.2.23", features = ["rt-threaded", "macros"] }
tokio = { version = "^1.4", features = ["rt-multi-thread", "macros"] }
workunit_store = { path = "../../workunit_store" }
Loading

0 comments on commit 0a4b387

Please sign in to comment.