Skip to content

Commit

Permalink
Auto merge of rust-lang#3631 - RalfJung:blocking-refactor, r=RalfJung
Browse files Browse the repository at this point in the history
completely refactor how we manage blocking and unblocking threads

This hides a lot of invariants from the implementation of the synchronization primitives, and makes sure we never have to release or acquire a vector clock on another thread but the active one.
  • Loading branch information
bors committed May 26, 2024
2 parents 8e861c6 + 2e89443 commit 0963353
Show file tree
Hide file tree
Showing 19 changed files with 951 additions and 1,043 deletions.
14 changes: 5 additions & 9 deletions src/tools/miri/src/alloc_addresses/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,10 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
size,
align,
memory_kind,
ecx.get_active_thread(),
ecx.active_thread(),
) {
if let Some(clock) = clock
&& let Some(data_race) = &ecx.machine.data_race
{
data_race.acquire_clock(&clock, ecx.get_active_thread());
if let Some(clock) = clock {
ecx.acquire_clock(&clock);
}
reuse_addr
} else {
Expand Down Expand Up @@ -369,12 +367,10 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
// `alloc_id_from_addr` any more.
global_state.exposed.remove(&dead_id);
// Also remember this address for future reuse.
let thread = self.threads.get_active_thread_id();
let thread = self.threads.active_thread();
global_state.reuse.add_addr(rng, addr, size, align, kind, thread, || {
if let Some(data_race) = &self.data_race {
data_race
.release_clock(thread, self.threads.active_thread_ref().current_span())
.clone()
data_race.release_clock(&self.threads).clone()
} else {
VClock::default()
}
Expand Down
152 changes: 69 additions & 83 deletions src/tools/miri/src/concurrency/data_race.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::{
};

use rustc_ast::Mutability;
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_data_structures::fx::FxHashSet;
use rustc_index::{Idx, IndexVec};
use rustc_middle::{mir, ty::Ty};
use rustc_span::Span;
Expand Down Expand Up @@ -822,6 +822,26 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
assert!(!old, "cannot nest allow_data_races");
}
}

/// Returns the `release` clock of the current thread.
/// Other threads can acquire this clock in the future to establish synchronization
/// with this program point.
fn release_clock<'a>(&'a self) -> Option<Ref<'a, VClock>>
where
'mir: 'a,
{
let this = self.eval_context_ref();
Some(this.machine.data_race.as_ref()?.release_clock(&this.machine.threads))
}

/// Acquire the given clock into the current thread, establishing synchronization with
/// the moment when that clock snapshot was taken via `release_clock`.
fn acquire_clock(&self, clock: &VClock) {
let this = self.eval_context_ref();
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(clock, &this.machine.threads);
}
}
}

/// Vector clock metadata for a logical memory allocation.
Expand Down Expand Up @@ -1412,13 +1432,6 @@ pub struct GlobalState {
/// active vector-clocks catch up with the threads timestamp.
reuse_candidates: RefCell<FxHashSet<VectorIdx>>,

/// This contains threads that have terminated, but not yet joined
/// and so cannot become re-use candidates until a join operation
/// occurs.
/// The associated vector index will be moved into re-use candidates
/// after the join operation occurs.
terminated_threads: RefCell<FxHashMap<ThreadId, VectorIdx>>,

/// The timestamp of last SC fence performed by each thread
last_sc_fence: RefCell<VClock>,

Expand Down Expand Up @@ -1446,7 +1459,6 @@ impl GlobalState {
vector_info: RefCell::new(IndexVec::new()),
thread_info: RefCell::new(IndexVec::new()),
reuse_candidates: RefCell::new(FxHashSet::default()),
terminated_threads: RefCell::new(FxHashMap::default()),
last_sc_fence: RefCell::new(VClock::default()),
last_sc_write: RefCell::new(VClock::default()),
track_outdated_loads: config.track_outdated_loads,
Expand Down Expand Up @@ -1480,8 +1492,6 @@ impl GlobalState {
fn find_vector_index_reuse_candidate(&self) -> Option<VectorIdx> {
let mut reuse = self.reuse_candidates.borrow_mut();
let vector_clocks = self.vector_clocks.borrow();
let vector_info = self.vector_info.borrow();
let terminated_threads = self.terminated_threads.borrow();
for &candidate in reuse.iter() {
let target_timestamp = vector_clocks[candidate].clock[candidate];
if vector_clocks.iter_enumerated().all(|(clock_idx, clock)| {
Expand All @@ -1491,9 +1501,7 @@ impl GlobalState {

// The vector represents a thread that has terminated and hence cannot
// report a data-race with the candidate index.
let thread_id = vector_info[clock_idx];
let vector_terminated =
reuse.contains(&clock_idx) || terminated_threads.contains_key(&thread_id);
let vector_terminated = reuse.contains(&clock_idx);

// The vector index cannot report a race with the candidate index
// and hence allows the candidate index to be re-used.
Expand Down Expand Up @@ -1583,55 +1591,38 @@ impl GlobalState {
/// thread (the joinee, the thread that someone waited on) and the current thread (the joiner,
/// the thread who was waiting).
#[inline]
pub fn thread_joined(
&mut self,
thread_mgr: &ThreadManager<'_, '_>,
joiner: ThreadId,
joinee: ThreadId,
) {
let clocks_vec = self.vector_clocks.get_mut();
let thread_info = self.thread_info.get_mut();

// Load the vector clock of the current thread.
let current_index = thread_info[joiner]
.vector_index
.expect("Performed thread join on thread with no assigned vector");
let current = &mut clocks_vec[current_index];
pub fn thread_joined(&mut self, threads: &ThreadManager<'_, '_>, joinee: ThreadId) {
let thread_info = self.thread_info.borrow();
let thread_info = &thread_info[joinee];

// Load the associated vector clock for the terminated thread.
let join_clock = thread_info[joinee]
let join_clock = thread_info
.termination_vector_clock
.as_ref()
.expect("Joined with thread but thread has not terminated");

// The join thread happens-before the current thread
// so update the current vector clock.
// Is not a release operation so the clock is not incremented.
current.clock.join(join_clock);
.expect("joined with thread but thread has not terminated");
// Acquire that into the current thread.
self.acquire_clock(join_clock, threads);

// Check the number of live threads, if the value is 1
// then test for potentially disabling multi-threaded execution.
if thread_mgr.get_live_thread_count() == 1 {
// May potentially be able to disable multi-threaded execution.
let current_clock = &clocks_vec[current_index];
if clocks_vec
.iter_enumerated()
.all(|(idx, clocks)| clocks.clock[idx] <= current_clock.clock[idx])
{
// All thread terminations happen-before the current clock
// therefore no data-races can be reported until a new thread
// is created, so disable multi-threaded execution.
self.multi_threaded.set(false);
// This has to happen after `acquire_clock`, otherwise there'll always
// be some thread that has not synchronized yet.
if let Some(current_index) = thread_info.vector_index {
if threads.get_live_thread_count() == 1 {
let vector_clocks = self.vector_clocks.get_mut();
// May potentially be able to disable multi-threaded execution.
let current_clock = &vector_clocks[current_index];
if vector_clocks
.iter_enumerated()
.all(|(idx, clocks)| clocks.clock[idx] <= current_clock.clock[idx])
{
// All thread terminations happen-before the current clock
// therefore no data-races can be reported until a new thread
// is created, so disable multi-threaded execution.
self.multi_threaded.set(false);
}
}
}

// If the thread is marked as terminated but not joined
// then move the thread to the re-use set.
let termination = self.terminated_threads.get_mut();
if let Some(index) = termination.remove(&joinee) {
let reuse = self.reuse_candidates.get_mut();
reuse.insert(index);
}
}

/// On thread termination, the vector-clock may re-used
Expand All @@ -1642,29 +1633,18 @@ impl GlobalState {
/// This should be called strictly before any calls to
/// `thread_joined`.
#[inline]
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>, current_span: Span) {
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>) {
let current_thread = thread_mgr.active_thread();
let current_index = self.active_thread_index(thread_mgr);

// Increment the clock to a unique termination timestamp.
let vector_clocks = self.vector_clocks.get_mut();
let current_clocks = &mut vector_clocks[current_index];
current_clocks.increment_clock(current_index, current_span);

// Load the current thread id for the executing vector.
let vector_info = self.vector_info.get_mut();
let current_thread = vector_info[current_index];

// Load the current thread metadata, and move to a terminated
// vector state. Setting up the vector clock all join operations
// will use.
let thread_info = self.thread_info.get_mut();
let current = &mut thread_info[current_thread];
current.termination_vector_clock = Some(current_clocks.clock.clone());
// Store the terminaion clock.
let terminaion_clock = self.release_clock(thread_mgr).clone();
self.thread_info.get_mut()[current_thread].termination_vector_clock =
Some(terminaion_clock);

// Add this thread as a candidate for re-use after a thread join
// occurs.
let termination = self.terminated_threads.get_mut();
termination.insert(current_thread, current_index);
// Add this thread's clock index as a candidate for re-use.
let reuse = self.reuse_candidates.get_mut();
reuse.insert(current_index);
}

/// Attempt to perform a synchronized operation, this
Expand Down Expand Up @@ -1702,23 +1682,29 @@ impl GlobalState {
format!("thread `{thread_name}`")
}

/// Acquire the given clock into the given thread, establishing synchronization with
/// Acquire the given clock into the current thread, establishing synchronization with
/// the moment when that clock snapshot was taken via `release_clock`.
/// As this is an acquire operation, the thread timestamp is not
/// incremented.
pub fn acquire_clock(&self, lock: &VClock, thread: ThreadId) {
pub fn acquire_clock<'mir, 'tcx>(&self, clock: &VClock, threads: &ThreadManager<'mir, 'tcx>) {
let thread = threads.active_thread();
let (_, mut clocks) = self.thread_state_mut(thread);
clocks.clock.join(lock);
clocks.clock.join(clock);
}

/// Returns the `release` clock of the given thread.
/// Returns the `release` clock of the current thread.
/// Other threads can acquire this clock in the future to establish synchronization
/// with this program point.
pub fn release_clock(&self, thread: ThreadId, current_span: Span) -> Ref<'_, VClock> {
pub fn release_clock<'mir, 'tcx>(
&self,
threads: &ThreadManager<'mir, 'tcx>,
) -> Ref<'_, VClock> {
let thread = threads.active_thread();
let span = threads.active_thread_ref().current_span();
// We increment the clock each time this happens, to ensure no two releases
// can be confused with each other.
let (index, mut clocks) = self.thread_state_mut(thread);
clocks.increment_clock(index, current_span);
clocks.increment_clock(index, span);
drop(clocks);
// To return a read-only view, we need to release the RefCell
// and borrow it again.
Expand Down Expand Up @@ -1757,7 +1743,7 @@ impl GlobalState {
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
self.thread_state(thread_mgr.get_active_thread_id())
self.thread_state(thread_mgr.active_thread())
}

/// Load the current vector clock in use and the current set of thread clocks
Expand All @@ -1767,14 +1753,14 @@ impl GlobalState {
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
self.thread_state_mut(thread_mgr.get_active_thread_id())
self.thread_state_mut(thread_mgr.active_thread())
}

/// Return the current thread, should be the same
/// as the data-race active thread.
#[inline]
fn active_thread_index(&self, thread_mgr: &ThreadManager<'_, '_>) -> VectorIdx {
let active_thread_id = thread_mgr.get_active_thread_id();
let active_thread_id = thread_mgr.active_thread();
self.thread_index(active_thread_id)
}

Expand Down
Loading

0 comments on commit 0963353

Please sign in to comment.