diff --git a/kunai-common/src/bpf_events.rs b/kunai-common/src/bpf_events.rs index e6e11cf..6b68fca 100644 --- a/kunai-common/src/bpf_events.rs +++ b/kunai-common/src/bpf_events.rs @@ -123,6 +123,8 @@ pub enum Type { CacheHash, #[str("error")] Error, + #[str("syscore_resume")] + SyscoreResume, // !!! all new event types must be put before max #[str("max")] diff --git a/kunai-common/src/bpf_events/events.rs b/kunai-common/src/bpf_events/events.rs index e25e79c..54bfd77 100644 --- a/kunai-common/src/bpf_events/events.rs +++ b/kunai-common/src/bpf_events/events.rs @@ -32,6 +32,8 @@ mod prctl; pub use prctl::*; pub mod error; pub use error::{ErrorData, ErrorEvent}; +pub mod syscore_resume; +pub use syscore_resume::*; // prevent using correlation event in bpf code not_bpf_target_code! { @@ -76,6 +78,7 @@ const fn max_bpf_event_size() -> usize { Type::FileUnlink => UnlinkEvent::size_of(), Type::Unknown | Type::EndEvents | Type::Correlation | Type::CacheHash | Type::Max => 0, Type::Error => ErrorEvent::size_of(), + Type::SyscoreResume => SysCoreResumeEvent::size_of(), // never handle _ pattern otherwise this function loses all interest }; if size > max { diff --git a/kunai-common/src/bpf_events/events/syscore_resume.rs b/kunai-common/src/bpf_events/events/syscore_resume.rs new file mode 100644 index 0000000..a1b9ae9 --- /dev/null +++ b/kunai-common/src/bpf_events/events/syscore_resume.rs @@ -0,0 +1,6 @@ +use crate::bpf_events::Event; + +pub type SysCoreResumeEvent = Event; + +#[repr(C)] +pub struct SysCoreResumeData {} diff --git a/kunai-ebpf/src/probes.rs b/kunai-ebpf/src/probes.rs index 27c0e0e..f633e69 100644 --- a/kunai-ebpf/src/probes.rs +++ b/kunai-ebpf/src/probes.rs @@ -30,6 +30,7 @@ mod mprotect; mod prctl; mod schedule; mod send_data; +mod syscore_resume; /// macro to track ignored results macro_rules! ignore_result { diff --git a/kunai-ebpf/src/probes/syscore_resume.rs b/kunai-ebpf/src/probes/syscore_resume.rs new file mode 100644 index 0000000..f7be03a --- /dev/null +++ b/kunai-ebpf/src/probes/syscore_resume.rs @@ -0,0 +1,27 @@ +use aya_ebpf::programs::ProbeContext; + +use super::*; + +// this probe is hit when the system is resumed, it is a way to +// create a trigger for program reload as a bug has been identified +// for some kretprobes not surviving to a suspend/resume cycle +// https://bugzilla.kernel.org/show_bug.cgi?id=218775 +#[kprobe(function = "syscore_resume")] +pub fn enter_syscore_resume(ctx: ProbeContext) -> u32 { + match unsafe { try_syscore_resume(&ctx) } { + Ok(_) => errors::BPF_PROG_SUCCESS, + Err(s) => { + error!(&ctx, s); + errors::BPF_PROG_FAILURE + } + } +} + +unsafe fn try_syscore_resume(ctx: &ProbeContext) -> ProbeResult<()> { + let evt = alloc::alloc_zero::()?; + + evt.init_from_current_task(Type::SyscoreResume)?; + + pipe_event(ctx, evt); + Ok(()) +} diff --git a/kunai/src/bin/main.rs b/kunai/src/bin/main.rs index 29f823c..d8c48b3 100644 --- a/kunai/src/bin/main.rs +++ b/kunai/src/bin/main.rs @@ -24,6 +24,7 @@ use kunai_common::bpf_events::{ use kunai_common::config::{BpfConfig, Filter}; use kunai_common::inspect_err; +use kunai_common::version::KernelVersion; use log::LevelFilter; use serde::{Deserialize, Serialize}; @@ -33,6 +34,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::fs::{self, File}; use std::io::{self, BufRead, Write}; use std::net::IpAddr; + use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -40,6 +42,7 @@ use std::sync::mpsc::{channel, Receiver, SendError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; +use std::time::Duration; use std::{process, thread}; use aya::{ @@ -1247,6 +1250,7 @@ impl EventConsumer { }, Type::Error => panic!("error events should be processed earlier"), + Type::SyscoreResume => { /* just ignore it */ } } } } @@ -1259,6 +1263,10 @@ struct EventProducer { filter: Filter, stats: AyaHashMap, perf_array: AsyncPerfEventArray, + tasks: Vec>>, + stop: bool, + // flag to be set when the producer needs to reload + reload: bool, } #[inline(always)] @@ -1289,6 +1297,9 @@ impl EventProducer { filter, stats: stats_map, perf_array, + tasks: vec![], + stop: false, + reload: false, }) } @@ -1373,7 +1384,7 @@ impl EventProducer { /// this function must return true if main processing loop has to pass to the next event /// after the call. #[inline] - fn process_time_critical(&self, e: &mut EncodedEvent) -> bool { + fn process_time_critical(&mut self, e: &mut EncodedEvent) -> bool { let i = unsafe { e.info() }.expect("info should not fail here"); #[allow(clippy::single_match)] @@ -1419,6 +1430,13 @@ impl EventProducer { error::Level::Warn => warn!("{}", e), error::Level::Error => error!("{}", e), } + // we don't need to process such event further + return true; + } + Type::SyscoreResume => { + debug!("received syscore_resume event"); + self.reload = true; + // we don't need to process such event further return true; } _ => {} @@ -1483,7 +1501,7 @@ impl EventProducer { let conf = config.clone(); // process each perf buffer in a separate task - task::spawn(async move { + let t = task::spawn(async move { // the number of buffers we want to use gives us the number of events we can read // in one go in userland let mut buffers = (0..conf.max_buffered_events) @@ -1588,15 +1606,55 @@ impl EventProducer { // all threads wait that piped events are processed so that the reducer does not // handle events being piped in the same time by others bar.wait().await; + + // we break the loop if processor is stopped + if event_reader.lock().await.stop { + break; + } } #[allow(unreachable_code)] Ok::<_, PerfBufferError>(()) }); + + shared.lock().await.tasks.push(t); } shared } + + fn stop(&mut self) { + self.stop = true + } + + #[inline(always)] + fn is_finished(&self) -> bool { + self.tasks.iter().all(|t| t.is_finished()) + } + + async fn join(&mut self) -> anyhow::Result<()> { + while let Some(t) = self.tasks.pop() { + if t.is_finished() { + t.await??; + continue; + } + self.tasks.push(t) + } + Ok(()) + } + + async fn arc_join(arc: &Arc>, sleep: Duration) -> anyhow::Result<()> { + loop { + // drop lock before sleep + { + if arc.lock().await.is_finished() { + break; + } + } + time::sleep(sleep).await; + } + arc.lock().await.join().await + } } const ABOUT_KUNAI: &str = r#" @@ -1679,6 +1737,82 @@ enum Command { Replay(ReplayOpt), } +const BPF_ELF: &[u8] = { + #[cfg(debug_assertions)] + let d = include_bytes_aligned!("../../../target/bpfel-unknown-none/debug/kunai-ebpf"); + #[cfg(not(debug_assertions))] + let d = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/kunai-ebpf"); + d +}; + +fn prepare_bpf(kernel: KernelVersion, conf: &Config, vll: VerifierLogLevel) -> anyhow::Result { + let mut bpf = BpfLoader::new() + .verifier_log_level(vll) + .set_global("LINUX_KERNEL_VERSION", &kernel, true) + .load(BPF_ELF)?; + + BpfConfig::init_config_in_bpf(&mut bpf, conf.clone().try_into()?) + .expect("failed to initialize bpf configuration"); + + Ok(bpf) +} + +fn load_and_attach_bpf(kernel: KernelVersion, bpf: &mut Bpf) -> anyhow::Result> { + // make possible probe selection in debug + #[allow(unused_mut)] + let mut en_probes: Vec = vec![]; + #[cfg(debug_assertions)] + if let Ok(enable) = std::env::var("PROBES") { + enable.split(',').for_each(|s| en_probes.push(s.into())); + } + + // We need to parse eBPF ELF to extract section names + let mut programs = Programs::with_bpf(bpf).with_elf_info(BPF_ELF)?; + + kunai::configure_probes(&mut programs, kernel); + + // generic program loader + for (_, p) in programs.sorted_by_prio() { + // filtering probes to enable (only available in debug) + if !en_probes.is_empty() && en_probes.iter().filter(|e| p.name.contains(*e)).count() == 0 { + continue; + } + + // we force enabling of selected probes + // debug probes are disabled by default + if !en_probes.is_empty() { + p.enable(); + } + + info!( + "loading: {} {:?} with priority={}", + p.name, + p.prog_type(), + p.prio + ); + + if !p.enable { + warn!("{} probe has been disabled", p.name); + continue; + } + + if !p.is_compatible(&kernel) { + warn!( + "{} probe is not compatible with current kernel: min={} max={} current={}", + p.name, + p.compat.min(), + p.compat.max(), + kernel + ); + continue; + } + + p.load_and_attach()?; + } + + Ok(programs) +} + impl Command { fn replay(conf: Config, o: ReplayOpt) -> anyhow::Result<()> { let mut p = EventConsumer::with_config(conf.stdout_output())?; @@ -1728,6 +1862,7 @@ impl Command { | Type::Error | Type::EndEvents | Type::TaskSched + | Type::SyscoreResume | Type::Max => {} } } @@ -1744,87 +1879,46 @@ impl Command { "You need to be root to run this program, this is necessary to load eBPF programs", )); } - let current_kernel = Utsname::kernel_version()?; - let bpf_elf = { - #[cfg(debug_assertions)] - let d = include_bytes_aligned!("../../../target/bpfel-unknown-none/debug/kunai-ebpf"); - #[cfg(not(debug_assertions))] - let d = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/kunai-ebpf"); - d - }; - - let mut bpf = BpfLoader::new() - .verifier_log_level(vll) - .set_global("LINUX_KERNEL_VERSION", ¤t_kernel, true) - .load(bpf_elf)?; - - BpfConfig::init_config_in_bpf(&mut bpf, conf.clone().try_into()?) - .expect("failed to initialize bpf configuration"); - // we start event reader and event processor before loading the programs // if we load the programs first we might have some event lost errors let (sender, receiver) = channel::(); - EventProducer::with_params(&mut bpf, conf.clone(), sender)? - .produce() - .await; - EventConsumer::with_config(conf)?.consume(receiver)?; - - // make possible probe selection in debug - #[allow(unused_mut)] - let mut en_probes: Vec = vec![]; - #[cfg(debug_assertions)] - if let Ok(enable) = std::env::var("PROBES") { - enable.split(',').for_each(|s| en_probes.push(s.into())); - } + // we start consumer + EventConsumer::with_config(conf.clone())?.consume(receiver)?; - // We need to parse eBPF ELF to extract section names - let mut programs = Programs::from_bpf(&mut bpf).with_elf_info(bpf_elf)?; + // we spawn a task to reload producer when needed + task::spawn(async move { + loop { + info!("Starting event producer"); + // we start producer + let mut bpf = prepare_bpf(current_kernel, &conf, vll)?; + let arc_prod = EventProducer::with_params(&mut bpf, conf.clone(), sender.clone())? + .produce() + .await; - kunai::configure_probes(&mut programs, current_kernel); + // we load and attach bpf programs + load_and_attach_bpf(current_kernel, &mut bpf)?; - // generic program loader - for (_, mut p) in programs.into_vec_sorted_by_prio() { - // filtering probes to enable (only available in debug) - if !en_probes.is_empty() - && en_probes.iter().filter(|e| p.name.contains(*e)).count() == 0 - { - continue; - } - - // we force enabling of selected probes - // debug probes are disabled by default - if !en_probes.is_empty() { - p.enable(); - } - - info!( - "loading: {} {:?} with priority={}", - p.name, - p.prog_type(), - p.prio - ); - - if !p.enable { - warn!("{} probe has been disabled", p.name); - continue; - } - - if !p.is_compatible(¤t_kernel) { - warn!( - "{} probe is not compatible with current kernel: min={} max={} current={}", - p.name, - p.compat.min(), - p.compat.max(), - current_kernel - ); - continue; + loop { + // block make sure lock is dropped before sleeping + if arc_prod.lock().await.reload { + info!("Reloading event producer"); + arc_prod.lock().await.stop(); + // we wait for event producer to be ready + EventProducer::arc_join(&arc_prod, Duration::from_millis(500)).await?; + + // we do not need to unload programs as this will be done at drop + break; + } + time::sleep(Duration::from_millis(500)).await; + } } - p.attach()?; - } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); info!("Waiting for Ctrl-C..."); signal::ctrl_c().await?; @@ -1962,6 +2056,6 @@ async fn main() -> Result<(), anyhow::Error> { // We finished preparing config match cli.command { Some(Command::Replay(o)) => return Command::replay(conf, o), - _ => return Command::run(conf, verifier_level).await, + _ => Command::run(conf, verifier_level).await, } } diff --git a/kunai/src/compat.rs b/kunai/src/compat.rs index 55bec86..8f07828 100644 --- a/kunai/src/compat.rs +++ b/kunai/src/compat.rs @@ -1,5 +1,5 @@ use aya::{ - programs::{self, ProgramError}, + programs::{self, kprobe::KProbeLinkId, trace_point::TracePointLinkId, ProgramError}, Bpf, }; use aya_obj::generated::bpf_prog_type; @@ -16,6 +16,8 @@ pub enum Error { NoTpCategory(String), #[error("missing kernel attach function for program: {0}")] NoAttachFn(String), + #[error("wrong link id kind")] + WrongLinkId, #[error("{0}")] Program(#[from] ProgramError), } @@ -46,7 +48,7 @@ pub struct Programs<'a> { } impl<'a> Programs<'a> { - pub fn from_bpf(bpf: &'a mut Bpf) -> Self { + pub fn with_bpf(bpf: &'a mut Bpf) -> Self { let m = bpf .programs_mut() .map(|(name, p)| { @@ -85,6 +87,38 @@ impl<'a> Programs<'a> { sorted.sort_unstable_by_key(|(_, p)| p.prio_by_prog()); sorted } + + pub fn sorted_by_prio(&mut self) -> Vec<(&String, &mut Program<'a>)> { + let mut sorted = self.m.iter_mut().collect::>(); + sorted.sort_unstable_by_key(|(_, p)| p.prio_by_prog()); + sorted + } +} + +#[derive(Debug)] +pub enum LinkId { + KProbe(KProbeLinkId), + Tracepoint(TracePointLinkId), +} + +impl TryFrom for KProbeLinkId { + type Error = Error; + fn try_from(value: LinkId) -> Result { + match value { + LinkId::KProbe(l) => Ok(l), + _ => Err(Error::WrongLinkId), + } + } +} + +impl TryFrom for TracePointLinkId { + type Error = Error; + fn try_from(value: LinkId) -> Result { + match value { + LinkId::Tracepoint(l) => Ok(l), + _ => Err(Error::WrongLinkId), + } + } } pub struct Program<'a> { @@ -94,6 +128,9 @@ pub struct Program<'a> { pub compat: Compatibility, pub program: &'a mut programs::Program, pub enable: bool, + pub link_id: Option, + pub loaded: bool, + pub attached: bool, } impl<'a> Program<'a> { @@ -105,6 +142,9 @@ impl<'a> Program<'a> { program: p, compat: Compatibility::default(), enable: true, + link_id: None, + loaded: false, + attached: false, } } @@ -194,6 +234,43 @@ impl<'a> Program<'a> { self.enable = false } + pub fn load(&mut self) -> Result<(), Error> { + let program = self.prog_mut(); + + match program { + programs::Program::TracePoint(p) => { + p.load()?; + } + programs::Program::KProbe(p) => { + p.load()?; + } + _ => { + unimplemented!() + } + } + self.loaded = true; + Ok(()) + } + + pub fn unload(&mut self) -> Result<(), Error> { + let program = self.prog_mut(); + + match program { + programs::Program::TracePoint(p) => { + p.unload()?; + } + programs::Program::KProbe(p) => { + p.unload()?; + } + _ => { + unimplemented!() + } + } + self.loaded = false; + self.attached = false; + Ok(()) + } + pub fn attach(&mut self) -> Result<(), Error> { let program_name = self.name.clone(); let kernel_attach_fn = self.attach_func(); @@ -201,21 +278,39 @@ impl<'a> Program<'a> { let program = self.prog_mut(); match program { - programs::Program::TracePoint(program) => { - program.load()?; + programs::Program::TracePoint(p) => { let cat = tracepoint_category.ok_or(Error::NoTpCategory(program_name.clone()))?; let attach = kernel_attach_fn.ok_or(Error::NoAttachFn(program_name))?; - program.attach(&cat, &attach)?; + self.link_id = Some(LinkId::Tracepoint(p.attach(&cat, &attach)?)); } - programs::Program::KProbe(program) => { - program.load()?; + programs::Program::KProbe(p) => { let attach = kernel_attach_fn.ok_or(Error::NoAttachFn(program_name))?; - program.attach(attach, 0)?; + self.link_id = Some(LinkId::KProbe(p.attach(attach, 0)?)); } _ => { unimplemented!() } } + self.attached = true; + Ok(()) + } + + pub fn load_and_attach(&mut self) -> Result<(), Error> { + self.load()?; + self.attach() + } + + pub fn detach(&mut self) -> Result<(), Error> { + if let Some(link_id) = self.link_id.take() { + match self.prog_mut() { + programs::Program::TracePoint(p) => p.detach(link_id.try_into()?)?, + programs::Program::KProbe(p) => p.detach(link_id.try_into()?)?, + _ => { + unimplemented!() + } + } + } + self.attached = false; Ok(()) } } diff --git a/kunai/src/tests/kernel.rs b/kunai/src/tests/kernel.rs index efad1d5..03e794e 100644 --- a/kunai/src/tests/kernel.rs +++ b/kunai/src/tests/kernel.rs @@ -59,7 +59,7 @@ fn integration() -> anyhow::Result<()> { .set_global("LINUX_KERNEL_VERSION", ¤t_kernel, true) .load(bpf_elf)?; - let mut programs = Programs::from_bpf(&mut bpf).with_elf_info(bpf_elf)?; + let mut programs = Programs::with_bpf(&mut bpf).with_elf_info(bpf_elf)?; kunai::configure_probes(&mut programs, current_kernel); @@ -88,7 +88,7 @@ fn integration() -> anyhow::Result<()> { continue; } - p.attach()?; + p.load_and_attach()?; } Ok(())