Skip to content

Commit

Permalink
feat: Include list of image layer directories in ContainerInfo
Browse files Browse the repository at this point in the history
For each container, try to retrieve information about image layers paths
(from the host filesystem's perspective) and include them in
`ContainerInfo`.
  • Loading branch information
vadorovsky committed Jun 17, 2024
1 parent dfc62e1 commit 2774c52
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 91 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bpf-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ procfs = { workspace = true }
libc = { workspace = true }
glob = { workspace = true }
hex = { workspace = true }
hyper = { workspace = true }
hyperlocal = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
lazy_static = { workspace = true }
Expand Down
77 changes: 74 additions & 3 deletions crates/bpf-common/src/containers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use std::{
os::unix::ffi::OsStringExt,
path::{Path, PathBuf},
ptr,
str::FromStr,
};

use diesel::{connection::SimpleConnection, prelude::*};
use hyper::{body, Client};
use hyperlocal::{UnixClientExt, Uri as HyperlocalUri};
use ini::Ini;
use nix::unistd::Uid;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -138,13 +141,41 @@ struct LibpodDBContainerConfig {
json: String,
}

/// Docker API response for `image inspect` request.
#[derive(Debug, Deserialize)]
struct ImageInspect {
#[serde(rename = "GraphDriver")]
graph_driver: GraphDriver,
}

/// Data associated with Docker graphdriver.
#[derive(Debug, Deserialize)]
struct GraphDriver {
#[serde(rename = "Data")]
data: GraphDriverData,
}

#[derive(Debug, Deserialize)]
struct GraphDriverData {
#[serde(rename = "LowerDir")]
lower_dir: Option<String>,
#[serde(rename = "MergedDir")]
merged_dir: Option<PathBuf>,
#[serde(rename = "UpperDir")]
upper_dir: Option<PathBuf>,
#[serde(rename = "WorkDir")]
work_dir: Option<PathBuf>,
}

/// Container information used in Pulsar alerts and rules.
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Validatron)]
pub struct ContainerInfo {
pub id: String,
pub name: String,
pub image: String,
pub image_digest: String,
#[validatron(skip)]
pub layers: Vec<PathBuf>,
}

impl fmt::Display for ContainerInfo {
Expand All @@ -158,19 +189,19 @@ impl fmt::Display for ContainerInfo {
}

impl ContainerInfo {
pub fn from_container_id(
pub async fn from_container_id(
container_id: ContainerId,
uid: Uid,
) -> Result<Option<Self>, ContainerError> {
let info = match container_id {
ContainerId::Docker(id) => Self::from_docker_id(id),
ContainerId::Docker(id) => Self::from_docker_id(id).await,
ContainerId::Libpod(id) => Self::from_libpod_id(id, uid),
};

info.map(Some)
}

fn from_docker_id(id: String) -> Result<Self, ContainerError> {
async fn from_docker_id(id: String) -> Result<Self, ContainerError> {
const DOCKER_CONTAINERS_PATH: &str = "/var/lib/docker/containers";

let path = PathBuf::from(DOCKER_CONTAINERS_PATH)
Expand All @@ -194,11 +225,49 @@ impl ContainerInfo {
let image = config.config.image;
let image_digest = config.image_digest;

// `image_digest` has format like:
//
// ```
// sha256:1d34ffeaf190be23d3de5a8de0a436676b758f48f835c3a2d4768b798c15a7f1
// ```
//
// The unprefixed digest is used as an image ID.
let image_id = image_digest.split(':').last().unwrap();

let client = Client::unix();
let url = HyperlocalUri::new(
"/var/run/docker.sock",
&format!("/images/{}/json", image_id),
);

let response = client.get(url.into()).await.unwrap();
let body_bytes = body::to_bytes(response).await.unwrap();

let response: ImageInspect = serde_json::from_slice(&body_bytes).unwrap();

// Gather all filesystem layer paths.
let mut layers = Vec::new();
if let Some(lower_dirs) = response.graph_driver.data.lower_dir {
for lower_dir in lower_dirs.split(':') {
layers.push(PathBuf::from_str(lower_dir).unwrap());
}
}
if let Some(merged_dir) = response.graph_driver.data.merged_dir {
layers.push(merged_dir);
}
if let Some(upper_dir) = response.graph_driver.data.upper_dir {
layers.push(upper_dir);
}
if let Some(work_dir) = response.graph_driver.data.work_dir {
layers.push(work_dir);
}

Ok(Self {
id,
name,
image,
image_digest,
layers,
})
}

Expand Down Expand Up @@ -266,6 +335,8 @@ impl ContainerInfo {
name: config.name,
image: config.rootfs_image_name,
image_digest: image.digest.clone(),
// TODO(vadorovsky): Parse layer information in Podman.
layers: Vec::new(),
})
}
}
Expand Down
198 changes: 110 additions & 88 deletions crates/pulsar-core/src/pdk/process_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::collections::{BTreeMap, HashMap};
use std::{
collections::{BTreeMap, HashMap},
future::Future,
pin::Pin,
};

use bpf_common::{
containers::{ContainerId, ContainerInfo},
Expand Down Expand Up @@ -209,7 +213,7 @@ impl ProcessTracker {
tokio::select! {
msg = self.rx.recv() => match msg {
Some(msg) => {
self.handle_message(msg);
self.handle_message(msg).await;
self.cleanup();
// We check pending requests here and not periodically because
// the only way we can get a response is by handling a message.
Expand All @@ -224,9 +228,11 @@ impl ProcessTracker {
}
}

fn handle_message(&mut self, req: TrackerRequest) {
async fn handle_message(&mut self, req: TrackerRequest) {
match req {
TrackerRequest::UpdateProcess(update) => self.handle_update(update),
TrackerRequest::UpdateProcess(update) => {
self.handle_update(update).await;
}
TrackerRequest::GetProcessInfo(info_request) => {
let r = self.get_info(info_request.pid, info_request.ts);
match r {
Expand Down Expand Up @@ -263,100 +269,116 @@ impl ProcessTracker {
}
}

fn handle_update(&mut self, mut update: TrackerUpdate) {
match update {
TrackerUpdate::Fork {
pid,
uid,
gid,
timestamp,
ppid,
namespaces,
container_id,
} => {
let container =
container_id.and_then(|c_id| {
match ContainerInfo::from_container_id(c_id, uid) {
Ok(container) => container,
Err(err) => {
log::error!("{err}");
None
fn handle_update<'a>(
&'a mut self,
mut update: TrackerUpdate,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
match update {
TrackerUpdate::Fork {
pid,
uid,
gid,
timestamp,
ppid,
namespaces,
container_id,
} => {
let container = match container_id {
Some(container_id) => {
match ContainerInfo::from_container_id(container_id.clone(), uid).await
{
Ok(container) => container,
Err(err) => {
log::error!("{err}");
None
}
}
}
});
None => None,
};

self.processes.insert(
pid,
ProcessData {
ppid,
uid,
gid,
fork_time: timestamp,
exit_time: None,
original_image: self.get_image(ppid, timestamp),
exec_changes: BTreeMap::new(),
argv: self
.processes
.get(&ppid)
.map(|parent| parent.argv.clone())
.unwrap_or_default(),
namespaces,
container,
},
);
if let Some(pending_updates) = self.pending_updates.remove(&pid) {
pending_updates
.into_iter()
.for_each(|update| self.handle_update(update));
self.processes.insert(
pid,
ProcessData {
ppid,
uid,
gid,
fork_time: timestamp,
exit_time: None,
original_image: self.get_image(ppid, timestamp),
exec_changes: BTreeMap::new(),
argv: self
.processes
.get(&ppid)
.map(|parent| parent.argv.clone())
.unwrap_or_default(),
namespaces,
container,
},
);
if let Some(pending_updates) = self.pending_updates.remove(&pid) {
for update in pending_updates {
self.handle_update(update).await;
}
}
}
}
TrackerUpdate::Exec {
pid,
uid,
timestamp,
ref mut image,
ref mut argv,
namespaces,
ref container_id,
} => {
let container = container_id.clone().and_then(|c_id| {
match ContainerInfo::from_container_id(c_id, uid) {
Ok(container) => container,
Err(err) => {
log::error!("{err}");
None
TrackerUpdate::Exec {
pid,
uid,
timestamp,
ref mut image,
ref mut argv,
namespaces,
ref container_id,
} => {
let container = match container_id {
Some(container_id) => {
match ContainerInfo::from_container_id(container_id.clone(), uid).await
{
Ok(container) => container,
Err(err) => {
log::error!("{err}");
None
}
}
}
None => None,
};

if let Some(p) = self.processes.get_mut(&pid) {
p.exec_changes.insert(timestamp, std::mem::take(image));
p.argv = std::mem::take(argv);
p.namespaces = namespaces;
p.container = container;
} else {
// if exec arrived before the fork, we save the event as pending
log::debug!(
"(exec) Process {pid} not found in process tree, saving for later"
);
self.pending_updates.entry(pid).or_default().push(update);
}
});

if let Some(p) = self.processes.get_mut(&pid) {
p.exec_changes.insert(timestamp, std::mem::take(image));
p.argv = std::mem::take(argv);
p.namespaces = namespaces;
p.container = container;
} else {
// if exec arrived before the fork, we save the event as pending
log::debug!("(exec) Process {pid} not found in process tree, saving for later");
self.pending_updates.entry(pid).or_default().push(update);
}
}
TrackerUpdate::Exit { pid, timestamp } => {
if let Some(p) = self.processes.get_mut(&pid) {
p.exit_time = Some(timestamp);
} else {
// if exit arrived before the fork, we save the event as pending
log::debug!("(exit) Process {pid} not found in process tree, saving for later");
self.pending_updates.entry(pid).or_default().push(update);
TrackerUpdate::Exit { pid, timestamp } => {
if let Some(p) = self.processes.get_mut(&pid) {
p.exit_time = Some(timestamp);
} else {
// if exit arrived before the fork, we save the event as pending
log::debug!(
"(exit) Process {pid} not found in process tree, saving for later"
);
self.pending_updates.entry(pid).or_default().push(update);
}
}
}
TrackerUpdate::SetNewParent { pid, ppid } => {
if let Some(p) = self.processes.get_mut(&pid) {
p.ppid = ppid;
} else {
log::warn!("{ppid} is the new parent of {pid}, but we couldn't find it")
TrackerUpdate::SetNewParent { pid, ppid } => {
if let Some(p) = self.processes.get_mut(&pid) {
p.ppid = ppid;
} else {
log::warn!("{ppid} is the new parent of {pid}, but we couldn't find it")
}
}
}
}
})
}

fn get_info(&self, pid: Pid, ts: Timestamp) -> Result<ProcessInfo, TrackerError> {
Expand Down

0 comments on commit 2774c52

Please sign in to comment.