Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for handling SIGINT and SIGCHLD from bgworker #1229

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions pgrx/src/bgworkers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::time::Duration;
pub static mut PREV_SHMEM_STARTUP_HOOK: Option<unsafe extern "C" fn()> = None;
static GOT_SIGHUP: AtomicBool = AtomicBool::new(false);
static GOT_SIGTERM: AtomicBool = AtomicBool::new(false);
static GOT_SIGINT: AtomicBool = AtomicBool::new(false);
static GOT_SIGCHLD: AtomicBool = AtomicBool::new(false);

bitflags! {
struct BGWflags: i32 {
Expand All @@ -35,6 +37,8 @@ bitflags! {
pub struct SignalWakeFlags: i32 {
const SIGHUP = 0x1;
const SIGTERM = 0x2;
const SIGINT = 0x4;
const SIGCHLD = 0x8;
}
}

Expand Down Expand Up @@ -103,7 +107,10 @@ impl BackgroundWorker {
.expect("'extra' is not valid UTF8")
}

/// Have we received a SIGUP?
/// Have we received a SIGHUP since previously calling this function? This resets the
/// internal boolean that tracks if SIGHUP was received. So when calling this twice in
/// in a row, the second time will always return false.
#[must_use = "you aren't getting the same bool back if you call this twice"]
pub fn sighup_received() -> bool {
unsafe {
assert!(!pg_sys::MyBgworkerEntry.is_null(), "BackgroundWorker associated functions can only be called from a registered background worker");
Expand All @@ -112,7 +119,10 @@ impl BackgroundWorker {
GOT_SIGHUP.swap(false, Ordering::SeqCst)
}

/// Have we received a SIGTERM?
/// Have we received a SIGTERM since previously calling this function? This resets the
/// internal boolean that tracks if SIGTERM was received. So when calling this twice in
/// in a row, the second time will always return false.
#[must_use = "you aren't getting the same bool back if you call this twice"]
pub fn sigterm_received() -> bool {
unsafe {
assert!(!pg_sys::MyBgworkerEntry.is_null(), "BackgroundWorker associated functions can only be called from a registered background worker");
Expand All @@ -121,6 +131,30 @@ impl BackgroundWorker {
GOT_SIGTERM.swap(false, Ordering::SeqCst)
}

/// Have we received a SIGINT since previously calling this function? This resets the
/// internal boolean that tracks if SIGINT was received. So when calling this twice in
/// in a row, the second time will always return false.
#[must_use = "you aren't getting the same bool back if you call this twice"]
pub fn sigint_received() -> bool {
unsafe {
assert!(!pg_sys::MyBgworkerEntry.is_null(), "BackgroundWorker associated functions can only be called from a registered background worker");
}
// toggle the bool to false, returning whatever it was
GOT_SIGINT.swap(false, Ordering::SeqCst)
}

/// Have we received a SIGCHLD since previously calling this function? This resets the
/// internal boolean that tracks if SIGCHLD was received. So when calling this twice in
/// in a row, the second time will always return false.
#[must_use = "you aren't getting the same bool back if you call this twice"]
pub fn sigchld_received() -> bool {
unsafe {
assert!(!pg_sys::MyBgworkerEntry.is_null(), "BackgroundWorker associated functions can only be called from a registered background worker");
}
// toggle the bool to false, returning whatever it was
GOT_SIGCHLD.swap(false, Ordering::SeqCst)
}

/// Wait for the specified amount of time on the background worker's latch
///
/// Returns true if we're still supposed to be alive and haven't received a SIGTERM
Expand Down Expand Up @@ -189,6 +223,12 @@ impl BackgroundWorker {
if wake.contains(SignalWakeFlags::SIGTERM) {
pg_sys::pqsignal(pg_sys::SIGTERM as i32, Some(worker_spi_sigterm));
}
if wake.contains(SignalWakeFlags::SIGINT) {
pg_sys::pqsignal(pg_sys::SIGINT as i32, Some(worker_spi_sigint));
}
if wake.contains(SignalWakeFlags::SIGCHLD) {
pg_sys::pqsignal(pg_sys::SIGCHLD as i32, Some(worker_spi_sigchld));
}
pg_sys::BackgroundWorkerUnblockSignals();
}
}
Expand Down Expand Up @@ -224,6 +264,16 @@ unsafe extern "C" fn worker_spi_sigterm(_signal_args: i32) {
pg_sys::SetLatch(pg_sys::MyLatch);
}

unsafe extern "C" fn worker_spi_sigint(_signal_args: i32) {
GOT_SIGINT.store(true, Ordering::SeqCst);
pg_sys::SetLatch(pg_sys::MyLatch);
}

unsafe extern "C" fn worker_spi_sigchld(_signal_args: i32) {
GOT_SIGCHLD.store(true, Ordering::SeqCst);
pg_sys::SetLatch(pg_sys::MyLatch);
}

/// Dynamic background worker handle
pub struct DynamicBackgroundWorker {
handle: *mut pg_sys::BackgroundWorkerHandle,
Expand Down