Skip to content

Commit

Permalink
Clean up signal processing
Browse files Browse the repository at this point in the history
Signed-off-by: James Sturtevant <jstur@microsoft.com>
  • Loading branch information
jsturtevant committed Apr 28, 2023
1 parent 0d5591d commit 538321b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 80 deletions.
21 changes: 11 additions & 10 deletions crates/shim/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,26 @@
//!
use std::{collections::hash_map::DefaultHasher, fs::File, hash::Hasher, path::PathBuf};
#[cfg(windows)]
use std::{fs::OpenOptions, os::windows::prelude::OpenOptionsExt};
pub use containerd_shim_protos as protos;
pub use protos::{
shim::shim::DeleteResponse,
ttrpc::{context::Context, Result as TtrpcResult},
};

#[cfg(unix)]
use std::{
os::unix::{io::RawFd, net::UnixListener},
path::Path,
};

pub use containerd_shim_protos as protos;
#[cfg(unix)]
use nix::ioctl_write_ptr_bad;
pub use protos::{
shim::shim::DeleteResponse,
ttrpc::{context::Context, Result as TtrpcResult},
};
#[cfg(unix)]
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);

#[cfg(windows)]
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
#[cfg(windows)]
use std::{fs::OpenOptions, os::windows::prelude::OpenOptionsExt};

#[cfg(feature = "async")]
pub use crate::asynchronous::*;
Expand Down Expand Up @@ -117,8 +120,6 @@ cfg_async! {
pub use protos::ttrpc::r#async::TtrpcContext;
}

#[cfg(unix)]
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);

const TTRPC_ADDRESS: &str = "TTRPC_ADDRESS";

Expand Down
149 changes: 79 additions & 70 deletions crates/shim/src/synchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ cfg_unix! {
},
unistd::Pid,
};
use signal_hook::iterator::Signals;
use signal_hook::iterator::Signals as UnixSignals;
use std::os::unix::fs::FileTypeExt;
use std::{convert::TryFrom, fs, path::Path};
use std::os::fd::AsRawFd;
Expand Down Expand Up @@ -122,6 +122,12 @@ pub mod util;
#[derive(Default)]
pub struct ExitSignal(Mutex<bool>, Condvar);

// Wrapper type to help hide platform specific signal handling.
struct Signals{
#[cfg(unix)]
signals: UnixSignals,
}

#[allow(clippy::mutex_atomic)]
impl ExitSignal {
/// Set exit signal to shutdown shim server.
Expand Down Expand Up @@ -200,12 +206,9 @@ where
// Create shim instance
let mut config = opts.unwrap_or_default();

#[cfg(unix)]
// Setup signals
let signals = setup_signals(&config);

#[cfg(windows)]
setup_signals();
// Setup signals (On Linux need register signals before start main app according to signal_hook docs)
let signals = setup_signals(&config);

if !config.no_sub_reaper {
reap::set_subreaper()?;
Expand Down Expand Up @@ -234,10 +237,7 @@ where
Ok(())
}
"delete" => {
#[cfg(unix)]
std::thread::spawn(move || handle_signals(signals));
#[cfg(windows)]
std::thread::spawn(handle_signals);
let response = shim.delete_shim()?;
let stdout = std::io::stdout();
let mut locked = stdout.lock();
Expand Down Expand Up @@ -297,29 +297,33 @@ fn create_server(_flags: args::Flags) -> Result<Server> {
Ok(server)
}

#[cfg(unix)]
fn setup_signals(config: &Config) -> Signals {
let signals = Signals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed");
if !config.no_reaper {
signals.add_signal(SIGCHLD).expect("add signal failed");
}
signals
}

#[cfg(windows)]
fn setup_signals() {
unsafe {
SEMAPHORE = CreateSemaphoreA(ptr::null_mut(), 0, MAX_SEM_COUNT, ptr::null());
if SEMAPHORE == 0 {
panic!("Failed to create semaphore: {}", io::Error::last_os_error());
fn setup_signals(_config: &Config) -> Option<Signals> {
#[cfg(unix)]
{
let signals = UnixSignals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed");
if !_config.no_reaper {
signals.add_signal(SIGCHLD).expect("add signal failed");
}
Some(Signals(signals))
}

if SetConsoleCtrlHandler(Some(signal_handler), 1) == 0 {
let e = io::Error::last_os_error();
CloseHandle(SEMAPHORE);
SEMAPHORE = 0 as HANDLE;
panic!("Failed to set console handler: {}", e);
#[cfg(windows)]
{
unsafe {
SEMAPHORE = CreateSemaphoreA(ptr::null_mut(), 0, MAX_SEM_COUNT, ptr::null());
if SEMAPHORE == 0 {
panic!("Failed to create semaphore: {}", io::Error::last_os_error());
}

if SetConsoleCtrlHandler(Some(signal_handler), 1) == 0 {
let e = io::Error::last_os_error();
CloseHandle(SEMAPHORE);
SEMAPHORE = 0 as HANDLE;
panic!("Failed to set console handler: {}", e);
}
}
None
}
}

Expand All @@ -329,56 +333,61 @@ unsafe extern "system" fn signal_handler(_: u32) -> i32 {
1
}

#[cfg(unix)]
fn handle_signals(mut signals: Signals) {
loop {
for sig in signals.wait() {
match sig {
SIGTERM | SIGINT => {
debug!("received {}", sig);
}
SIGCHLD => loop {
// Note that this thread sticks to child even it is suspended.
match wait::waitpid(Some(Pid::from_raw(-1)), Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(pid, status)) => {
monitor::monitor_notify_by_pid(pid.as_raw(), status)
.unwrap_or_else(|e| error!("failed to send exit event {}", e))
}
Ok(WaitStatus::Signaled(pid, sig, _)) => {
debug!("child {} terminated({})", pid, sig);
let exit_code = 128 + sig as i32;
monitor::monitor_notify_by_pid(pid.as_raw(), exit_code)
.unwrap_or_else(|e| error!("failed to send signal event {}", e))
}
Err(Errno::ECHILD) => {
break;
fn handle_signals(mut _signals: Option<Signals>) {
#[cfg(unix)]
{
let signals = _signals.take().unwrap();
loop {
for sig in signals.wait() {
match sig {
SIGTERM | SIGINT => {
debug!("received {}", sig);
}
SIGCHLD => loop {
// Note that this thread sticks to child even it is suspended.
match wait::waitpid(Some(Pid::from_raw(-1)), Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(pid, status)) => {
monitor::monitor_notify_by_pid(pid.as_raw(), status)
.unwrap_or_else(|e| error!("failed to send exit event {}", e))
}
Ok(WaitStatus::Signaled(pid, sig, _)) => {
debug!("child {} terminated({})", pid, sig);
let exit_code = 128 + sig as i32;
monitor::monitor_notify_by_pid(pid.as_raw(), exit_code)
.unwrap_or_else(|e| error!("failed to send signal event {}", e))
}
Err(Errno::ECHILD) => {
break;
}
Err(e) => {
// stick until all children will be successfully waited, even some unexpected error occurs.
warn!("error occurred in signal handler: {}", e);
}
_ => {} // stick until exit
}
Err(e) => {
// stick until all children will be successfully waited, even some unexpected error occurs.
warn!("error occurred in signal handler: {}", e);
},
_ => {
if let Ok(sig) = Signal::try_from(sig) {
debug!("received {}", sig);
} else {
warn!("received invalid signal {}", sig);
}
_ => {} // stick until exit
}
},
_ => {
if let Ok(sig) = Signal::try_from(sig) {
debug!("received {}", sig);
} else {
warn!("received invalid signal {}", sig);
}
}
}
}
}
}

#[cfg(windows)]
// must start on thread as waitforSingleObject puts the current thread to sleep
fn handle_signals() {
unsafe {
WaitForSingleObject(SEMAPHORE, INFINITE);
//Windows doesn't have similiar signal like SIGCHLD
// We could implement something if required but for now
#[cfg(windows)]
{
// must start on thread as waitforSingleObject puts the current thread to sleep
loop {
unsafe {
WaitForSingleObject(SEMAPHORE, INFINITE);
//Windows doesn't have similar signal like SIGCHLD
// We could implement something if required but for now
}
}
}
}

Expand Down

0 comments on commit 538321b

Please sign in to comment.