Skip to content

Commit

Permalink
chore: use the loom mutex wrapper everywhere (#3958)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn authored Jul 20, 2021
1 parent c8fc492 commit 549e89e
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 155 deletions.
10 changes: 5 additions & 5 deletions tokio/src/loom/std/atomic_u64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ mod imp {

#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))]
mod imp {
use crate::loom::sync::Mutex;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

#[derive(Debug)]
pub(crate) struct AtomicU64 {
Expand All @@ -31,15 +31,15 @@ mod imp {
}

pub(crate) fn load(&self, _: Ordering) -> u64 {
*self.inner.lock().unwrap()
*self.inner.lock()
}

pub(crate) fn store(&self, val: u64, _: Ordering) {
*self.inner.lock().unwrap() = val;
*self.inner.lock() = val;
}

pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 {
let mut lock = self.inner.lock().unwrap();
let mut lock = self.inner.lock();
let prev = *lock;
*lock = prev | val;
prev
Expand All @@ -52,7 +52,7 @@ mod imp {
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
let mut lock = self.inner.lock();

if *lock == current {
*lock = new;
Expand Down
132 changes: 0 additions & 132 deletions tokio/src/runtime/shell.rs

This file was deleted.

7 changes: 3 additions & 4 deletions tokio/src/runtime/tests/loom_oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::loom::sync::{Arc, Mutex};
use loom::sync::Notify;

use std::sync::{Arc, Mutex};

pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
notify: Notify::new(),
Expand Down Expand Up @@ -31,15 +30,15 @@ struct Inner<T> {

impl<T> Sender<T> {
pub(crate) fn send(self, value: T) {
*self.inner.value.lock().unwrap() = Some(value);
*self.inner.value.lock() = Some(value);
self.inner.notify.notify();
}
}

impl<T> Receiver<T> {
pub(crate) fn recv(self) -> T {
loop {
if let Some(v) = self.inner.value.lock().unwrap().take() {
if let Some(v) = self.inner.value.lock().take() {
return v;
}

Expand Down
5 changes: 2 additions & 3 deletions tokio/src/sync/barrier.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::loom::sync::Mutex;
use crate::sync::watch;

use std::sync::Mutex;

/// A barrier enables multiple tasks to synchronize the beginning of some computation.
///
/// ```
Expand Down Expand Up @@ -94,7 +93,7 @@ impl Barrier {
// NOTE: the extra scope here is so that the compiler doesn't think `state` is held across
// a yield point, and thus marks the returned future as !Send.
let generation = {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock();
let generation = state.generation;
state.arrived += 1;
if state.arrived == self.n {
Expand Down
9 changes: 4 additions & 5 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Runs `!Send` futures on the current thread.
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Task};
use crate::sync::AtomicWaker;

Expand All @@ -8,7 +9,6 @@ use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;

use pin_project_lite::pin_project;
Expand Down Expand Up @@ -537,7 +537,6 @@ impl LocalSet {
.shared
.queue
.lock()
.unwrap()
.pop_front()
.or_else(|| self.context.tasks.borrow_mut().queue.pop_front())
} else {
Expand All @@ -546,7 +545,7 @@ impl LocalSet {
.borrow_mut()
.queue
.pop_front()
.or_else(|| self.context.shared.queue.lock().unwrap().pop_front())
.or_else(|| self.context.shared.queue.lock().pop_front())
}
}

Expand Down Expand Up @@ -610,7 +609,7 @@ impl Drop for LocalSet {
task.shutdown();
}

for task in self.context.shared.queue.lock().unwrap().drain(..) {
for task in self.context.shared.queue.lock().drain(..) {
task.shutdown();
}

Expand Down Expand Up @@ -660,7 +659,7 @@ impl Shared {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().unwrap().push_back(task);
self.queue.lock().push_back(task);
self.waker.wake();
}
});
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ cfg_not_test_util! {

cfg_test_util! {
use crate::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use crate::loom::sync::{Arc, Mutex};

cfg_rt! {
fn clock() -> Option<Clock> {
Expand Down Expand Up @@ -102,7 +102,7 @@ cfg_test_util! {
/// runtime.
pub fn resume() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock().unwrap();
let mut inner = clock.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
Expand Down Expand Up @@ -164,7 +164,7 @@ cfg_test_util! {
}

pub(crate) fn pause(&self) {
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock();

if !inner.enable_pausing {
drop(inner); // avoid poisoning the lock
Expand All @@ -178,12 +178,12 @@ cfg_test_util! {
}

pub(crate) fn is_paused(&self) -> bool {
let inner = self.inner.lock().unwrap();
let inner = self.inner.lock();
inner.unfrozen.is_none()
}

pub(crate) fn advance(&self, duration: Duration) {
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
Expand All @@ -193,7 +193,7 @@ cfg_test_util! {
}

pub(crate) fn now(&self) -> Instant {
let inner = self.inner.lock().unwrap();
let inner = self.inner.lock();

let mut ret = inner.base;

Expand Down

0 comments on commit 549e89e

Please sign in to comment.