Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganise std::unstable::mutex to add an RAII unlocker to the mutex & replace LittleLock #12235

Closed
wants to merge 7 commits into from
32 changes: 15 additions & 17 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop};
use std::rt::task::BlockedTask;
use std::rt::task::Task;
use std::sync::deque;
use std::unstable::mutex::Mutex;
use std::unstable::mutex::NativeMutex;
use std::unstable::raw;

use TaskState;
Expand Down Expand Up @@ -669,8 +669,7 @@ impl Scheduler {
// is acquired here. This is the resumption points and the "bounce"
// that it is referring to.
unsafe {
current_task.nasty_deschedule_lock.lock();
current_task.nasty_deschedule_lock.unlock();
let _guard = current_task.nasty_deschedule_lock.lock();
}
return current_task;
}
Expand Down Expand Up @@ -765,10 +764,11 @@ impl Scheduler {
// to it, but we're guaranteed that the task won't exit until we've
// unlocked the lock so there's no worry of this memory going away.
let cur = self.change_task_context(cur, next, |sched, mut task| {
let lock: *mut Mutex = &mut task.nasty_deschedule_lock;
unsafe { (*lock).lock() }
f(sched, BlockedTask::block(task.swap()));
unsafe { (*lock).unlock() }
let lock: *mut NativeMutex = &mut task.nasty_deschedule_lock;
unsafe {
let _guard = (*lock).lock();
f(sched, BlockedTask::block(task.swap()));
}
});
cur.put();
}
Expand Down Expand Up @@ -1453,8 +1453,8 @@ mod test {

#[test]
fn test_spawn_sched_blocking() {
use std::unstable::mutex::{Mutex, MUTEX_INIT};
static mut LOCK: Mutex = MUTEX_INIT;
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;

// Testing that a task in one scheduler can block in foreign code
// without affecting other schedulers
Expand All @@ -1466,12 +1466,11 @@ mod test {
let mut handle = pool.spawn_sched();
handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
unsafe {
LOCK.lock();
let mut guard = LOCK.lock();

start_ch.send(());
LOCK.wait(); // block the scheduler thread
LOCK.signal(); // let them know we have the lock
LOCK.unlock();
guard.wait(); // block the scheduler thread
guard.signal(); // let them know we have the lock
}

fin_ch.send(());
Expand Down Expand Up @@ -1503,10 +1502,9 @@ mod test {
child_ch.send(20);
pingpong(&parent_po, &child_ch);
unsafe {
LOCK.lock();
LOCK.signal(); // wakeup waiting scheduler
LOCK.wait(); // wait for them to grab the lock
LOCK.unlock();
let mut guard = LOCK.lock();
guard.signal(); // wakeup waiting scheduler
guard.wait(); // wait for them to grab the lock
}
})));
drop(handle);
Expand Down
10 changes: 5 additions & 5 deletions src/libgreen/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use std::rt::local::Local;
use std::rt::rtio;
use std::rt::task::{Task, BlockedTask};
use std::task::TaskOpts;
use std::unstable::sync::LittleLock;
use std::unstable::mutex::NativeMutex;

struct SimpleTask {
lock: LittleLock,
lock: NativeMutex,
awoken: bool,
}

Expand Down Expand Up @@ -59,9 +59,9 @@ impl Runtime for SimpleTask {
to_wake.put_runtime(self as ~Runtime);
unsafe {
cast::forget(to_wake);
let _l = (*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = true;
(*me).lock.signal();
guard.signal();
}
}

Expand All @@ -83,7 +83,7 @@ impl Runtime for SimpleTask {
pub fn task() -> ~Task {
let mut task = ~Task::new();
task.put_runtime(~SimpleTask {
lock: LittleLock::new(),
lock: unsafe {NativeMutex::new()},
awoken: false,
} as ~Runtime);
return task;
Expand Down
17 changes: 5 additions & 12 deletions src/libgreen/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::rt::local::Local;
use std::rt::rtio;
use std::rt::task::{Task, BlockedTask, SendMessage};
use std::task::TaskOpts;
use std::unstable::mutex::Mutex;
use std::unstable::mutex::NativeMutex;
use std::unstable::raw;

use context::Context;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct GreenTask {
pool_id: uint,

// See the comments in the scheduler about why this is necessary
nasty_deschedule_lock: Mutex,
nasty_deschedule_lock: NativeMutex,
}

pub enum TaskType {
Expand Down Expand Up @@ -163,7 +163,7 @@ impl GreenTask {
task_type: task_type,
sched: None,
handle: None,
nasty_deschedule_lock: unsafe { Mutex::new() },
nasty_deschedule_lock: unsafe { NativeMutex::new() },
task: Some(~Task::new()),
}
}
Expand Down Expand Up @@ -322,11 +322,10 @@ impl GreenTask {
// uncontended except for when the task is rescheduled).
fn reawaken_remotely(mut ~self) {
unsafe {
let mtx = &mut self.nasty_deschedule_lock as *mut Mutex;
let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
let handle = self.handle.get_mut_ref() as *mut SchedHandle;
(*mtx).lock();
let _guard = (*mtx).lock();
(*handle).send(RunOnce(self));
(*mtx).unlock();
}
}
}
Expand Down Expand Up @@ -479,12 +478,6 @@ impl Runtime for GreenTask {
fn wrap(~self) -> ~Any { self as ~Any }
}

impl Drop for GreenTask {
fn drop(&mut self) {
unsafe { self.nasty_deschedule_lock.destroy(); }
}
}

#[cfg(test)]
mod tests {
use std::rt::Runtime;
Expand Down
18 changes: 9 additions & 9 deletions src/libnative/bookkeeping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
//! The green counterpart for this is bookkeeping on sched pools.

use std::sync::atomics;
use std::unstable::mutex::{Mutex, MUTEX_INIT};
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};

static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
static mut TASK_LOCK: Mutex = MUTEX_INIT;
static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;

pub fn increment() {
let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) };
Expand All @@ -29,9 +29,8 @@ pub fn increment() {
pub fn decrement() {
unsafe {
if TASK_COUNT.fetch_sub(1, atomics::SeqCst) == 1 {
TASK_LOCK.lock();
TASK_LOCK.signal();
TASK_LOCK.unlock();
let mut guard = TASK_LOCK.lock();
guard.signal();
}
}
}
Expand All @@ -40,11 +39,12 @@ pub fn decrement() {
/// the entry points of native programs
pub fn wait_for_other_tasks() {
unsafe {
TASK_LOCK.lock();
while TASK_COUNT.load(atomics::SeqCst) > 0 {
TASK_LOCK.wait();
{
let mut guard = TASK_LOCK.lock();
while TASK_COUNT.load(atomics::SeqCst) > 0 {
guard.wait();
}
}
TASK_LOCK.unlock();
TASK_LOCK.destroy();
}
}
7 changes: 3 additions & 4 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,18 @@ pub fn init() {
}

unsafe {
use std::unstable::mutex::{Mutex, MUTEX_INIT};
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut INITIALIZED: bool = false;
static mut LOCK: Mutex = MUTEX_INIT;
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;

LOCK.lock();
let _guard = LOCK.lock();
if !INITIALIZED {
let mut data: WSADATA = mem::init();
let ret = WSAStartup(0x202, // version 2.2
&mut data);
assert_eq!(ret, 0);
INITIALIZED = true;
}
LOCK.unlock();
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/libnative/io/timer_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::cast;
use std::rt;
use std::unstable::mutex::{Mutex, MUTEX_INIT};
use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};

use bookkeeping;
use io::timer::{Req, Shutdown};
Expand All @@ -37,11 +37,11 @@ static mut HELPER_CHAN: *mut Chan<Req> = 0 as *mut Chan<Req>;
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;

pub fn boot(helper: fn(imp::signal, Port<Req>)) {
static mut LOCK: Mutex = MUTEX_INIT;
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static mut INITIALIZED: bool = false;

unsafe {
LOCK.lock();
let mut _guard = LOCK.lock();
if !INITIALIZED {
let (msgp, msgc) = Chan::new();
// promote this to a shared channel
Expand All @@ -58,7 +58,6 @@ pub fn boot(helper: fn(imp::signal, Port<Req>)) {
rt::at_exit(proc() { shutdown() });
INITIALIZED = true;
}
LOCK.unlock();
}
}

Expand Down
27 changes: 9 additions & 18 deletions src/libnative/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::rt::task::{Task, BlockedTask, SendMessage};
use std::rt::thread::Thread;
use std::rt;
use std::task::TaskOpts;
use std::unstable::mutex::Mutex;
use std::unstable::mutex::NativeMutex;
use std::unstable::stack;

use io;
Expand All @@ -40,7 +40,7 @@ pub fn new(stack_bounds: (uint, uint)) -> ~Task {

fn ops() -> ~Ops {
~Ops {
lock: unsafe { Mutex::new() },
lock: unsafe { NativeMutex::new() },
awoken: false,
io: io::IoFactory::new(),
// these *should* get overwritten
Expand Down Expand Up @@ -109,7 +109,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
// This structure is the glue between channels and the 1:1 scheduling mode. This
// structure is allocated once per task.
struct Ops {
lock: Mutex, // native synchronization
lock: NativeMutex, // native synchronization
awoken: bool, // used to prevent spurious wakeups
io: io::IoFactory, // local I/O factory

Expand Down Expand Up @@ -191,20 +191,19 @@ impl rt::Runtime for Ops {
let task = BlockedTask::block(cur_task);

if times == 1 {
(*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = false;
match f(task) {
Ok(()) => {
while !(*me).awoken {
(*me).lock.wait();
guard.wait();
}
}
Err(task) => { cast::forget(task.wake()); }
}
(*me).lock.unlock();
} else {
let mut iter = task.make_selectable(times);
(*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = false;
let success = iter.all(|task| {
match f(task) {
Expand All @@ -216,9 +215,8 @@ impl rt::Runtime for Ops {
}
});
while success && !(*me).awoken {
(*me).lock.wait();
guard.wait();
}
(*me).lock.unlock();
}
// re-acquire ownership of the task
cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
Expand All @@ -235,10 +233,9 @@ impl rt::Runtime for Ops {
let me = &mut *self as *mut Ops;
to_wake.put_runtime(self as ~rt::Runtime);
cast::forget(to_wake);
(*me).lock.lock();
let mut guard = (*me).lock.lock();
(*me).awoken = true;
(*me).lock.signal();
(*me).lock.unlock();
guard.signal();
}
}

Expand All @@ -254,12 +251,6 @@ impl rt::Runtime for Ops {
}
}

impl Drop for Ops {
fn drop(&mut self) {
unsafe { self.lock.destroy() }
}
}

#[cfg(test)]
mod tests {
use std::rt::Runtime;
Expand Down
6 changes: 3 additions & 3 deletions src/librustuv/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use std::cast;
use std::libc::{c_void, c_int};
use std::rt::task::BlockedTask;
use std::unstable::sync::LittleLock;
use std::unstable::mutex::NativeMutex;
use std::sync::arc::UnsafeArc;
use mpsc = std::sync::mpsc_queue;

Expand All @@ -39,7 +39,7 @@ enum Message {

struct State {
handle: *uvll::uv_async_t,
lock: LittleLock, // see comments in async_cb for why this is needed
lock: NativeMutex, // see comments in async_cb for why this is needed
queue: mpsc::Queue<Message>,
}

Expand Down Expand Up @@ -112,7 +112,7 @@ impl QueuePool {
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
let state = UnsafeArc::new(State {
handle: handle,
lock: LittleLock::new(),
lock: unsafe {NativeMutex::new()},
queue: mpsc::Queue::new(),
});
let q = ~QueuePool {
Expand Down
Loading