Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to crossbeam-queue for events #1091

Merged
merged 2 commits into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions analysis/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ bincode = "1.0.1"
once_cell = "1"
enum_dispatch = "0.3"
fs-err = "2"
crossbeam-queue = "0.3"
crossbeam-utils = "0.8"
5 changes: 5 additions & 0 deletions analysis/runtime/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ use crate::events::{Event, EventKind};
use crate::mir_loc::MirLocId;
use crate::runtime::global_runtime::RUNTIME;

// WARNING! Most handlers in this file may be called from a signal handler,
// so they and all their callees should be signal-safe.
ahomescu marked this conversation as resolved.
Show resolved Hide resolved
// Signal handlers are generally not supposed to call memory allocation
// functions, so those do not need to be signal-safe.

/// A hook function (see [`HOOK_FUNCTIONS`]).
///
/// Instruments 64-bit `c2rust transpile`d `malloc`, which is similar to `libc::malloc`.
Expand Down
24 changes: 21 additions & 3 deletions analysis/runtime/src/runtime/backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crossbeam_queue::ArrayQueue;
use crossbeam_utils::Backoff;
use enum_dispatch::enum_dispatch;
use fs_err::{File, OpenOptions};
use std::fmt::Debug;
use std::io::{stderr, BufWriter, Write};
use std::sync::Arc;

use bincode;

Expand Down Expand Up @@ -80,18 +83,33 @@ pub enum Backend {
}

impl Backend {
fn write_all(&mut self, events: impl IntoIterator<Item = Event>) {
for event in events {
fn write_all(&mut self, events: Arc<ArrayQueue<Event>>) {
let backoff = Backoff::new();
loop {
let event = match events.pop() {
Some(event) => event,
None => {
// We can't block on a lock/semaphore here since
// it might not be safe for the event sender to wake
// us from inside a signal handler
backoff.snooze();
continue;
}
};

let done = matches!(event.kind, EventKind::Done);
self.write(event);
if done {
break;
}

// Reset the backoff timer since we got an event
backoff.reset();
}
self.flush();
}

pub fn run(&mut self, events: impl IntoIterator<Item = Event>) {
pub fn run(&mut self, events: Arc<ArrayQueue<Event>>) {
let (lock, cvar) = &*FINISHED;
let mut finished = lock.lock().unwrap();
ahomescu marked this conversation as resolved.
Show resolved Hide resolved
self.write_all(events);
Expand Down
5 changes: 5 additions & 0 deletions analysis/runtime/src/runtime/global_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,15 @@ impl GlobalRuntime {
///
/// It also silently drops the [`Event`] if the [`ScopedRuntime`]
/// has been [`ScopedRuntime::finalize`]d/[`GlobalRuntime::finalize`]d.
///
/// May be called from a signal handler, so it needs to be async-signal-safe.
pub fn send_event(&self, event: Event) {
// # Async-signal-safety: OnceCell::get() is just a dereference
match self.runtime.get() {
None => {
// Silently drop the [`Event`] as the [`ScopedRuntime`] isn't ready/initialized yet.
//
// # Async-signal-safety: `skip_event(_, BeforeMain)` is async-signal-safe.
skip_event(event, SkipReason::BeforeMain);
}
Some(runtime) => {
Expand Down
48 changes: 37 additions & 11 deletions analysis/runtime/src/runtime/scoped_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{
sync::{
mpsc::{self, SyncSender},
Mutex,
},
sync::{Arc, Mutex},
thread,
};

use crossbeam_queue::ArrayQueue;
use crossbeam_utils::Backoff;
use enum_dispatch::enum_dispatch;
use once_cell::sync::OnceCell;

Expand Down Expand Up @@ -104,6 +103,8 @@ impl ExistingRuntime for MainThreadRuntime {
self.backend.lock().unwrap().flush();
}

// # Async-signal-safety: NOT SAFE!!!
// Do not use this with programs that install signal handlers.
fn send_event(&self, event: Event) {
self.backend.lock().unwrap().write(event);
}
Expand All @@ -117,16 +118,36 @@ impl Runtime for MainThreadRuntime {
}

pub struct BackgroundThreadRuntime {
tx: SyncSender<Event>,
tx: Arc<ArrayQueue<Event>>,
finalized: OnceCell<()>,
}

impl BackgroundThreadRuntime {
fn push_event(&self, mut event: Event, can_sleep: bool) {
// # Async-signal-safety: This needs `can_sleep == false` if called from
// a signal handler; in that case, it spins instead of sleeping
// which should be safe. `ArrayQueue::push` is backed by a fixed-size
// array so it does not allocate.
let backoff = Backoff::new();
while let Err(event_back) = self.tx.push(event) {
if can_sleep {
backoff.snooze();
} else {
// We have no choice but to spin here because
// we might be inside a signal handler
backoff.spin();
}
event = event_back;
}
}
}

impl ExistingRuntime for BackgroundThreadRuntime {
fn finalize(&self) {
// Only run the finalizer once.
self.finalized.get_or_init(|| {
// Notify the backend that we're done.
self.tx.send(Event::done()).unwrap();
self.push_event(Event::done(), true);

// Wait for the backend thread to finish.
let (lock, cvar) = &*FINISHED;
Expand All @@ -147,12 +168,16 @@ impl ExistingRuntime for BackgroundThreadRuntime {
fn send_event(&self, event: Event) {
match self.finalized.get() {
None => {
// `.unwrap()` as we're in no place to handle an error here,
// unless we should silently drop the [`Event`] instead.
self.tx.send(event).unwrap();
// # Async-signal-safety: `push_event` is safe if `can_sleep == false`
self.push_event(event, false);
}
Some(()) => {
// Silently drop the [`Event`] as the [`BackgroundThreadRuntime`] has already been [`BackgroundThreadRuntime::finalize`]d.
// Silently drop the [`Event`] as the [`BackgroundThreadRuntime`]
// has already been [`BackgroundThreadRuntime::finalize`]d.
//
// # Async-signal-safety: `skip_event(_, AfterMain)` is NOT SAFE;
// however, see the comment in `skip_event` for an explanation
// of why this will probably be okay in practice.
skip_event(event, SkipReason::AfterMain);
}
}
Expand All @@ -172,7 +197,8 @@ impl Runtime for BackgroundThreadRuntime {
/// Initialize the [`BackgroundThreadRuntime`], which includes [`thread::spawn`]ing,
/// so it must be run post-`main`.
fn try_init(mut backend: Backend) -> Result<Self, AnyError> {
let (tx, rx) = mpsc::sync_channel(1024);
let tx = Arc::new(ArrayQueue::new(1 << 20));
let rx = Arc::clone(&tx);
thread::spawn(move || backend.run(rx));
Ok(Self {
tx,
Expand Down
23 changes: 14 additions & 9 deletions analysis/runtime/src/runtime/skip.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{
fmt::{self, Display, Formatter},
sync::atomic::{AtomicU64, Ordering},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
};

use once_cell::sync::OnceCell;

use crate::events::Event;

#[derive(Debug)]
Expand All @@ -24,8 +22,7 @@ impl Display for SkipReason {
}

static EVENTS_SKIPPED_BEFORE_MAIN: AtomicU64 = AtomicU64::new(0);

static WARNED_AFTER_MAIN: OnceCell<()> = OnceCell::new();
static WARNED_AFTER_MAIN: AtomicBool = AtomicBool::new(false);

/// Notify the user if any [`Event`]s were skipped before `main`.
///
Expand All @@ -45,14 +42,22 @@ pub(super) fn skip_event(event: Event, reason: SkipReason) {
use SkipReason::*;
match reason {
BeforeMain => {
// # Async-signal-safety: atomic increments are safe.
EVENTS_SKIPPED_BEFORE_MAIN.fetch_add(1, Ordering::Relaxed);
}
AfterMain => {
// This is after `main`, so it's safe to use things like `eprintln!`,
// which uses `ReentrantMutex` internally, which may use `pthread` mutexes.
WARNED_AFTER_MAIN.get_or_init(|| {
// # Async-signal-safety: not really signal-safe, but if we
// get a signal after `main` ends, we're probably fine.
// The allocator should have enough free memory by now
// to not need to call `mmap`.
if !WARNED_AFTER_MAIN.swap(true, Ordering::Relaxed) {
// WARNED_AFTER_MAIN was previously `false` but we swapped it,
// which will happen exactly once per run so we can print now.
eprintln!("skipping {reason}");
});
}
// TODO: It would be nice to get rid of the two `eprintln`s here
// so we can guarantee signal safety, but then we would get no
// debugging output.
eprintln!("skipped event after `main`: {:?}", event.kind);
ahomescu marked this conversation as resolved.
Show resolved Hide resolved
}
};
Expand Down
Loading