diff --git a/Cargo.lock b/Cargo.lock index e16a41b..2cc3095 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -310,6 +310,7 @@ dependencies = [ "regex", "serde", "serde_json", + "signal-hook", "simple_logger", "syslog", "tinyvec", @@ -551,6 +552,25 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" +[[package]] +name = "signal-hook" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "simple_logger" version = "1.16.0" diff --git a/Cargo.toml b/Cargo.toml index a1c4113..6ee3d07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ caps = "0.5" libc = "0.2" exacl = ">= 0.6" regex = "1" +signal-hook = "0.3" tinyvec = { version = "1", features = ["alloc"] } log = "0.4" diff --git a/src/coalesce.rs b/src/coalesce.rs index 5f07c76..84123a2 100644 --- a/src/coalesce.rs +++ b/src/coalesce.rs @@ -1054,6 +1054,11 @@ impl<'a> Coalesce<'a> { Ok(()) } + /// Flush all in-flight event data, including partial events + pub fn flush(&mut self) { + self.expire_inflight(u64::MAX); + } + pub fn dump_state(&self, mut w: &mut dyn Write) -> Result<(), Box> { serde_json::to_writer( &mut w, @@ -1092,7 +1097,7 @@ impl<'a> Coalesce<'a> { impl Drop for Coalesce<'_> { fn drop(&mut self) { - self.expire_inflight(u64::MAX) + self.flush(); } } diff --git a/src/main.rs b/src/main.rs index e40ec19..2c45755 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,9 +12,13 @@ use std::os::unix::fs::PermissionsExt; use std::os::unix::io::FromRawFd; use std::os::unix::net::UnixStream; use std::path::Path; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use std::time::{Duration, SystemTime}; -use nix::unistd::{chown, setresgid, setresuid, Uid, User}; +use nix::unistd::{chown, execve, setresgid, setresuid, Uid, User}; use caps::securebits::set_keepcaps; use caps::{CapSet, Capability}; @@ -64,11 +68,11 @@ fn drop_privileges(runas_user: &User) -> Result<(), Box> { capabilities.insert(Capability::CAP_SYS_PTRACE); capabilities.insert(Capability::CAP_DAC_READ_SEARCH); caps::set(None, CapSet::Permitted, &capabilities) - .map_err(|e| format!("set effective capabilities: {}", e))?; + .map_err(|e| format!("set permitted capabilities: {}", e))?; caps::set(None, CapSet::Effective, &capabilities) .map_err(|e| format!("set effective capabilities: {}", e))?; - caps::set(None, CapSet::Inheritable, &HashSet::new()) - .map_err(|e| format!("set inherited capabilities: {}", e))?; + caps::set(None, CapSet::Inheritable, &capabilities) + .map_err(|e| format!("set inheritable capabilities: {}", e))?; set_keepcaps(false)?; Ok(()) @@ -270,6 +274,9 @@ fn run_app() -> Result<(), Box> { // Logged to syslog by caller return Err(e); } + if let Err(e) = caps::clear(None, CapSet::Ambient) { + log::warn!("could not set ambient capabilities: {}", e); + } // Initial setup is done at this point. @@ -294,7 +301,42 @@ fn run_app() -> Result<(), Box> { let dump_state_period = config.debug.dump_state_period.map(Duration::from_secs); let mut dump_state_last_t = SystemTime::now(); + let hup = Arc::new(AtomicBool::new(false)); + signal_hook::flag::register(signal_hook::consts::SIGHUP, Arc::clone(&hup))?; + loop { + if hup.load(Ordering::Relaxed) { + let buf = input.buffer(); + let lines = buf.split_inclusive(|c| *c == b'\n'); + log::info!("Got SIGHUP."); + for line in lines { + if let Err(e) = coalesce.process_line(line.to_vec()) { + if let Some(ref mut l) = error_logger { + l.write_all(line) + .and_then(|_| l.flush()) + .map_err(|e| format!("write log: {}", e))?; + } + let line = String::from_utf8_lossy(line).replace('\n', ""); + log::error!("Error {} processing msg: {}", e, &line); + } + } + coalesce.flush(); + log::info!("Restarting..."); + use std::ffi::CString; + let argv: Vec = env::args().map(|a| CString::new(a).unwrap()).collect(); + let env: Vec = env::vars() + .map(|(k, v)| CString::new(format!("{}={}", k, v)).unwrap()) + .collect(); + + let mut capabilities = HashSet::new(); + capabilities.insert(Capability::CAP_SYS_PTRACE); + capabilities.insert(Capability::CAP_DAC_READ_SEARCH); + if let Err(e) = caps::set(None, CapSet::Ambient, &capabilities) { + log::warn!("could not set ambient capabilities: {}", e); + } + execve(&argv[0], &argv, &env)?; + } + line.clear(); if input .read_until(b'\n', &mut line) @@ -303,6 +345,7 @@ fn run_app() -> Result<(), Box> { { break; } + stats.lines += 1; match coalesce.process_line(line.clone()) { Ok(()) => (),