Skip to content

Commit

Permalink
Switch to crossbeam-queue for events
Browse files Browse the repository at this point in the history
Use a lock-free queue to store the event list
because the event callbacks might be called from
inside a signal handler. Most system calls are not
safe in that context, so we just spin if the queue
is full.
  • Loading branch information
ahomescu committed May 4, 2024
1 parent 9511a4f commit 64faf2f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 13 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions analysis/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ bincode = "1.0.1"
once_cell = "1"
enum_dispatch = "0.3"
fs-err = "2"
crossbeam-queue = "0.3"
18 changes: 15 additions & 3 deletions analysis/runtime/src/runtime/backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crossbeam_queue::ArrayQueue;
use enum_dispatch::enum_dispatch;
use fs_err::{File, OpenOptions};
use std::fmt::Debug;
use std::io::{stderr, BufWriter, Write};
use std::sync::Arc;

use bincode;

Expand Down Expand Up @@ -80,8 +82,18 @@ pub enum Backend {
}

impl Backend {
fn write_all(&mut self, events: impl IntoIterator<Item = Event>) {
for event in events {
fn write_all(&mut self, events: Arc<ArrayQueue<Event>>) {
loop {
let event = match events.pop() {
Some(event) => event,
None => {
// We can't use anything with a futex here since
// the event sender might run inside a signal handler
std::thread::sleep(std::time::Duration::from_millis(50));
continue;
}
};

let done = matches!(event.kind, EventKind::Done);
self.write(event);
if done {
Expand All @@ -91,7 +103,7 @@ impl Backend {
self.flush();
}

pub fn run(&mut self, events: impl IntoIterator<Item = Event>) {
pub fn run(&mut self, events: Arc<ArrayQueue<Event>>) {
let (lock, cvar) = &*FINISHED;
let mut finished = lock.lock().unwrap();
self.write_all(events);
Expand Down
28 changes: 18 additions & 10 deletions analysis/runtime/src/runtime/scoped_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{
sync::{
mpsc::{self, SyncSender},
Mutex,
},
sync::{Arc, Mutex},
thread,
};

use crossbeam_queue::ArrayQueue;
use enum_dispatch::enum_dispatch;
use once_cell::sync::OnceCell;

Expand Down Expand Up @@ -117,16 +115,27 @@ impl Runtime for MainThreadRuntime {
}

pub struct BackgroundThreadRuntime {
tx: SyncSender<Event>,
tx: Arc<ArrayQueue<Event>>,
finalized: OnceCell<()>,
}

impl BackgroundThreadRuntime {
fn push_event(&self, mut event: Event) {
// We have no choice but to spin here because
// we might be inside a signal handler
while let Err(event_back) = self.tx.push(event) {
std::hint::spin_loop();
event = event_back;
}
}
}

impl ExistingRuntime for BackgroundThreadRuntime {
fn finalize(&self) {
// Only run the finalizer once.
self.finalized.get_or_init(|| {
// Notify the backend that we're done.
self.tx.send(Event::done()).unwrap();
self.push_event(Event::done());

// Wait for the backend thread to finish.
let (lock, cvar) = &*FINISHED;
Expand All @@ -147,9 +156,7 @@ impl ExistingRuntime for BackgroundThreadRuntime {
fn send_event(&self, event: Event) {
match self.finalized.get() {
None => {
// `.unwrap()` as we're in no place to handle an error here,
// unless we should silently drop the [`Event`] instead.
self.tx.send(event).unwrap();
self.push_event(event);
}
Some(()) => {
// Silently drop the [`Event`] as the [`BackgroundThreadRuntime`] has already been [`BackgroundThreadRuntime::finalize`]d.
Expand All @@ -172,7 +179,8 @@ impl Runtime for BackgroundThreadRuntime {
/// Initialize the [`BackgroundThreadRuntime`], which includes [`thread::spawn`]ing,
/// so it must be run post-`main`.
fn try_init(mut backend: Backend) -> Result<Self, AnyError> {
let (tx, rx) = mpsc::sync_channel(1024);
let tx = Arc::new(ArrayQueue::new(1048576));
let rx = Arc::clone(&tx);
thread::spawn(move || backend.run(rx));
Ok(Self {
tx,
Expand Down

0 comments on commit 64faf2f

Please sign in to comment.