Skip to content

Commit

Permalink
Merge pull request #1277 from weaveworks/ebpf-plugin-improvements
Browse files Browse the repository at this point in the history
eBPF plugin improvements
  • Loading branch information
Alfonso Acosta committed Apr 13, 2016
2 parents 398258f + ef94818 commit 151567e
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 47 deletions.
155 changes: 128 additions & 27 deletions examples/plugins/http-requests/http-requests.c
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
#include <uapi/linux/ptrace.h>
#include <linux/skbuff.h>
#include <net/sock.h>

struct received_http_requests_key_t {
u32 pid;
};
BPF_HASH(received_http_requests, struct received_http_requests_key_t, u64);
/* Table from (Task group id|Task id) to (Number of received http requests).
We need to gather requests per task and not only per task group (i.e. userspace pid)
so that entries can be cleared up independently when a task exists.
This implies that userspace needs to do the per-process aggregation.
*/
BPF_HASH(received_http_requests, u64, u64);


/*
skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
buffers from kernel to userspace.
/* skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
buffers from kernel to userspace.
skb_copy_datagram_iter() has an associated tracepoint
(trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
it lacks the offset argument.
skb_copy_datagram_iter() has an associated tracepoint
(trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
it lacks the offset argument.
*/
int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len)
{

/* Inspect the beginning of socket buffers copied to user-space to determine if they
correspond to http requests.
/* Inspect the beginning of socket buffers copied to user-space to determine
if they correspond to http requests.
Caveats:
Expand All @@ -30,32 +31,132 @@ int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *sk
We could inspect the full packet but:
* It's very inefficient.
* Examining the non-linear (paginated) area of a socket buffer would be
really tricky from ebpf.
*/
really tricky from ebpf.
*/

/* TODO: exit early if it's not TCP */
/* Verify it's a TCP socket
TODO: is it worth caching it in a socket table?
*/
struct sock *sk = skb->sk;
unsigned short skc_family = sk->__sk_common.skc_family;
switch (skc_family) {
case PF_INET:
case PF_INET6:
case PF_UNIX:
break;
default:
return 0;
}
/* The socket type and procotol are not directly addressable since they are
bitfields. We access them by assuming sk_write_queue is immediately before
them (admittedly pretty hacky).
*/
unsigned int flags = 0;
size_t flags_offset = offsetof(typeof(struct sock), sk_write_queue) + sizeof(sk->sk_write_queue);
bpf_probe_read(&flags, sizeof(flags), ((u8*)sk) + flags_offset);
u16 sk_type = flags >> 16;
if (sk_type != SOCK_STREAM) {
return 0;
}
u8 sk_protocol = flags >> 8 & 0xFF;
/* The protocol is unset (IPPROTO_IP) in Unix sockets */
if ( (sk_protocol != IPPROTO_TCP) && ((skc_family == PF_UNIX) && (sk_protocol != IPPROTO_IP)) ) {
return 0;
}

/* Inline implementation of skb_headlen() */
unsigned int head_len = skb->len - skb->data_len;

/* http://stackoverflow.com/questions/25047905/http-request-minimum-size-in-bytes
minimum length of http request is always geater than 7 bytes
minimum length of http request is always greater than 7 bytes
*/
if (head_len - offset < 7) {
unsigned int available_data = head_len - offset;
if (available_data < 7) {
return 0;
}

u8 data[4] = {};
bpf_probe_read(&data, sizeof(data), skb->data + offset);
/* Check if buffer begins with a method name followed by a space.
/* TODO: support other methods and optimize lookups */
if ((data[0] == 'G') && (data[1] == 'E') && (data[2] == 'T') && (data[3] == ' ')) {
/* Record request */
struct received_http_requests_key_t key = {};
key.pid = bpf_get_current_pid_tgid() >> 32;
received_http_requests.increment(key);
To avoid false positives it would be good to do a deeper inspection
(i.e. fully ensure a 'Method SP Request-URI SP HTTP-Version CRLF'
structure) but loops are not allowed in ebpf, making variable-size-data
parsers infeasible.
*/
u8 data[8] = {};
if (available_data >= 8) {
/* We have confirmed having access to 7 bytes, but need 8 bytes to check the
space after OPTIONS. bpf_probe_read() requires its second argument to be
an immediate, so we obtain the data in this unsexy way.
*/
bpf_probe_read(&data, 8, skb->data + offset);
} else {
bpf_probe_read(&data, 7, skb->data + offset);
}

switch (data[0]) {
/* DELETE */
case 'D':
if ((data[1] != 'E') || (data[2] != 'L') || (data[3] != 'E') || (data[4] != 'T') || (data[5] != 'E') || (data[6] != ' ')) {
return 0;
}
break;

/* GET */
case 'G':
if ((data[1] != 'E') || (data[2] != 'T') || (data[3] != ' ')) {
return 0;
}
break;

/* HEAD */
case 'H':
if ((data[1] != 'E') || (data[2] != 'A') || (data[3] != 'D') || (data[4] != ' ')) {
return 0;
}
break;

/* OPTIONS */
case 'O':
if (available_data < 8 || (data[1] != 'P') || (data[2] != 'T') || (data[3] != 'I') || (data[4] != 'O') || (data[5] != 'N') || (data[6] != 'S') || (data[7] != ' ')) {
return 0;
}
break;

/* PATCH/POST/PUT */
case 'P':
switch (data[1]) {
case 'A':
if ((data[2] != 'T') || (data[3] != 'C') || (data[4] != 'H') || (data[5] != ' ')) {
return 0;
}
break;
case 'O':
if ((data[2] != 'S') || (data[3] != 'T') || (data[4] != ' ')) {
return 0;
}
break;
case 'U':
if ((data[2] != 'T') || (data[3] != ' ')) {
return 0;
}
break;
}
break;

default:
return 0;
}

/* Finally, bump the request counter for current task */
u64 pid_tgid = bpf_get_current_pid_tgid();
received_http_requests.increment(pid_tgid);

return 0;
}


/* Clear out request count entries of tasks on exit */
int kprobe__do_exit(struct pt_regs *ctx) {
u64 pid_tgid = bpf_get_current_pid_tgid();
received_http_requests.delete(&pid_tgid);
return 0;
}
42 changes: 22 additions & 20 deletions examples/plugins/http-requests/http-requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import bcc

import time
import collections
import datetime
import os
import signal
Expand All @@ -26,17 +27,24 @@ def __init__(self):
self.lock = threading.Lock()

def update_http_rate_per_pid(self, last_req_count_snapshot):
new_req_count_snapshot = dict()
new_http_rate_per_pid = dict()
# Aggregate the kernel's per-task http request counts into userland's
# per-process counts
req_count_table = self.bpf.get_table(EBPF_TABLE_NAME)
for key, value in req_count_table.iteritems():
request_delta = value.value
if key.pid in last_req_count_snapshot:
request_delta -= last_req_count_snapshot[key.pid]
if request_delta > 0:
new_http_rate_per_pid[key.pid] = request_delta

new_req_count_snapshot[key.pid] = value.value
new_req_count_snapshot = collections.defaultdict(int)
for pid_tgid, req_count in req_count_table.iteritems():
# Note that the kernel's tgid maps into userland's pid
# (not to be confused by the kernel's pid, which is
# the unique identifier of a kernel task)
pid = pid_tgid.value >> 32
new_req_count_snapshot[pid] += req_count.value

# Compute request rate
new_http_rate_per_pid = dict()
for pid, req_count in new_req_count_snapshot.iteritems():
request_delta = req_count
if pid in last_req_count_snapshot:
request_delta -= last_req_count_snapshot[pid]
new_http_rate_per_pid[pid] = request_delta

self.lock.acquire()
self.http_rate_per_pid = new_http_rate_per_pid
Expand All @@ -52,17 +60,11 @@ def on_http_rate_per_pid(self, f):

def run(self):
# Compute request rates based on the requests counts from the last
# second. It would be simpler to clear the table, wait one second
# second. It would be simpler to clear the table, wait one second but
# clear() is expensive (each entry is individually cleared with a system
# call) and less robust (clearing contends with the increments done by
# the kernel probe).
# FIXME: we need a mechanism to garbage-collect old processes, either
# here or on the probe. Some options are clearing the table once
# in a while (not ideal for the reasons above) or adding another
# probe to remove processes from the table when they die (this
# will probably require keeping keeping track of tasks and not
# just processes)
req_count_snapshot = dict()
# call) and less robust (it contends with the increments done by the
# kernel probe).
req_count_snapshot = collections.defaultdict(int)
while True:
time.sleep(1)
req_count_snapshot = self.update_http_rate_per_pid(req_count_snapshot)
Expand Down

0 comments on commit 151567e

Please sign in to comment.