Skip to content

Commit

Permalink
fix: use AtomicBool instead of RwLock
Browse files Browse the repository at this point in the history
  • Loading branch information
veeso committed Oct 13, 2024
1 parent acfe699 commit e7ac3df
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 73 deletions.
46 changes: 17 additions & 29 deletions src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ mod builder;
mod port;
mod worker;

use std::sync::atomic::AtomicBool;
// -- export
use std::sync::{mpsc, Arc, RwLock};
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
use std::time::Duration;

Expand Down Expand Up @@ -60,9 +61,9 @@ where
/// Max Time to wait when calling `recv()` on thread receiver
poll_timeout: Duration,
/// Indicates whether the worker should paused polling ports
paused: Arc<RwLock<bool>>,
paused: Arc<AtomicBool>,
/// Indicates whether the worker should keep running
running: Arc<RwLock<bool>>,
running: Arc<AtomicBool>,
/// Msg receiver from worker
recv: mpsc::Receiver<ListenerMsg<U>>,
/// Join handle for worker
Expand Down Expand Up @@ -104,14 +105,9 @@ where

/// Stop event listener
pub fn stop(&mut self) -> ListenerResult<()> {
{
// NOTE: keep these brackets to drop running after block
let mut running = match self.running.write() {
Ok(lock) => Ok(lock),
Err(_) => Err(ListenerError::CouldNotStop),
}?;
*running = false;
}
self.running
.store(false, std::sync::atomic::Ordering::Relaxed);

// Join thread
match self.thread.take().map(|x| x.join()) {
Some(Ok(_)) => Ok(()),
Expand All @@ -122,23 +118,15 @@ where

/// Pause event listener worker
pub fn pause(&mut self) -> ListenerResult<()> {
// NOTE: keep these brackets to drop running after block
let mut paused = match self.paused.write() {
Ok(lock) => Ok(lock),
Err(_) => Err(ListenerError::CouldNotStop),
}?;
*paused = true;
self.paused
.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
}

/// Unpause event listener worker
pub fn unpause(&mut self) -> ListenerResult<()> {
// NOTE: keep these brackets to drop running after block
let mut paused = match self.paused.write() {
Ok(lock) => Ok(lock),
Err(_) => Err(ListenerError::CouldNotStop),
}?;
*paused = false;
self.paused
.store(false, std::sync::atomic::Ordering::Relaxed);
Ok(())
}

Expand All @@ -154,9 +142,9 @@ where
/// Setup the thread and returns the structs necessary to interact with it
fn setup_thread(ports: Vec<Port<U>>, tick_interval: Option<Duration>) -> ThreadConfig<U> {
let (sender, recv) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused = Arc::new(AtomicBool::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running = Arc::new(AtomicBool::new(true));
let running_t = Arc::clone(&running);
// Start thread
let thread = thread::spawn(move || {
Expand All @@ -183,8 +171,8 @@ where
U: Eq + PartialEq + Clone + PartialOrd + Send + 'static,
{
rx: mpsc::Receiver<ListenerMsg<U>>,
paused: Arc<RwLock<bool>>,
running: Arc<RwLock<bool>>,
paused: Arc<AtomicBool>,
running: Arc<AtomicBool>,
thread: JoinHandle<()>,
}

Expand All @@ -194,8 +182,8 @@ where
{
pub fn new(
rx: mpsc::Receiver<ListenerMsg<U>>,
paused: Arc<RwLock<bool>>,
running: Arc<RwLock<bool>>,
paused: Arc<AtomicBool>,
running: Arc<AtomicBool>,
thread: JoinHandle<()>,
) -> Self {
Self {
Expand Down
68 changes: 24 additions & 44 deletions src/listener/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! This module implements the worker thread for the event listener
use std::ops::{Add, Sub};
use std::sync::{mpsc, Arc, RwLock};
use std::sync::atomic::AtomicBool;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};

Expand All @@ -18,8 +19,8 @@ where
{
ports: Vec<Port<U>>,
sender: mpsc::Sender<ListenerMsg<U>>,
paused: Arc<RwLock<bool>>,
running: Arc<RwLock<bool>>,
paused: Arc<AtomicBool>,
running: Arc<AtomicBool>,
next_tick: Instant,
tick_interval: Option<Duration>,
}
Expand All @@ -31,8 +32,8 @@ where
pub(super) fn new(
ports: Vec<Port<U>>,
sender: mpsc::Sender<ListenerMsg<U>>,
paused: Arc<RwLock<bool>>,
running: Arc<RwLock<bool>>,
paused: Arc<AtomicBool>,
running: Arc<AtomicBool>,
tick_interval: Option<Duration>,
) -> Self {
Self {
Expand Down Expand Up @@ -77,18 +78,12 @@ where

/// Returns whether should keep running
fn running(&self) -> bool {
if let Ok(lock) = self.running.read() {
return *lock;
}
true
self.running.load(std::sync::atomic::Ordering::Relaxed)
}

/// Returns whether worker is paused
fn paused(&self) -> bool {
if let Ok(lock) = self.paused.read() {
return *lock;
}
false
self.paused.load(std::sync::atomic::Ordering::Relaxed)
}

/// Returns whether it's time to tick.
Expand Down Expand Up @@ -177,7 +172,7 @@ mod test {

use pretty_assertions::assert_eq;

use super::super::{ListenerError, ListenerResult};
use super::super::ListenerResult;
use super::*;
use crate::core::event::{Key, KeyEvent};
use crate::mock::{MockEvent, MockPoll};
Expand All @@ -186,9 +181,9 @@ mod test {
#[test]
fn worker_should_poll_multiple_times() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused = Arc::new(AtomicBool::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running = Arc::new(AtomicBool::new(true));
let running_t = Arc::clone(&running);

let mock_port = Port::new(Box::new(MockPoll::default()), Duration::from_secs(5), 10);
Expand All @@ -209,9 +204,9 @@ mod test {
#[test]
fn worker_should_send_poll() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused = Arc::new(AtomicBool::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running = Arc::new(AtomicBool::new(true));
let running_t = Arc::clone(&running);
let mut worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Expand All @@ -236,9 +231,9 @@ mod test {
#[test]
fn worker_should_send_tick() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused = Arc::new(AtomicBool::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running = Arc::new(AtomicBool::new(true));
let running_t = Arc::clone(&running);
let mut worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Expand All @@ -263,9 +258,9 @@ mod test {
#[test]
fn worker_should_calc_times_correctly_with_tick() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused = Arc::new(AtomicBool::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running = Arc::new(AtomicBool::new(true));
let running_t = Arc::clone(&running);
let mut worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Expand All @@ -292,25 +287,17 @@ mod test {
// Now should no more tick and poll
assert_eq!(worker.should_tick(), false);
// Stop
{
let mut running_flag = match running.write() {
Ok(lock) => Ok(lock),
Err(_) => Err(ListenerError::CouldNotStop),
}
.ok()
.unwrap();
*running_flag = false;
}
running.store(false, std::sync::atomic::Ordering::Relaxed);
assert_eq!(worker.running(), false);
drop(rx);
}

#[test]
fn worker_should_calc_times_correctly_without_tick() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused = Arc::new(AtomicBool::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running = Arc::new(AtomicBool::new(true));
let running_t = Arc::clone(&running);
let worker = EventListenerWorker::<MockEvent>::new(
vec![Port::new(
Expand All @@ -332,15 +319,8 @@ mod test {
// Next event should be in 3 second (poll)
assert!(worker.next_event() <= Duration::from_secs(3));
// Stop
{
let mut running_flag = match running.write() {
Ok(lock) => Ok(lock),
Err(_) => Err(ListenerError::CouldNotStop),
}
.ok()
.unwrap();
*running_flag = false;
}
running.store(false, std::sync::atomic::Ordering::Relaxed);

assert_eq!(worker.running(), false);
drop(rx);
}
Expand All @@ -349,9 +329,9 @@ mod test {
#[should_panic]
fn worker_should_panic_when_trying_next_tick_without_it() {
let (tx, _) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused = Arc::new(AtomicBool::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running = Arc::new(AtomicBool::new(true));
let running_t = Arc::clone(&running);
let mut worker =
EventListenerWorker::<MockEvent>::new(vec![], tx, paused_t, running_t, None);
Expand Down

0 comments on commit e7ac3df

Please sign in to comment.