From c4eef9e265d836947f93b22d7fa8cbf15aecf0ed Mon Sep 17 00:00:00 2001 From: Lorenzo Manacorda Date: Fri, 28 Oct 2016 14:58:47 +0200 Subject: [PATCH] Add eBPF connection tracking Introduces connection tracking via eBPF. This allows scope to get notified of every connection event, without relying on the parsing of /proc/$pid/net/tcp{,6} and /proc/$pid/fd/*, and therefore improve performance. The eBPF program is in a python script using bcc: docker/tcpv4tracer.py. It is contributed upstream via https://github.com/iovisor/bcc/pull/762 It is using kprobes on the following kernel functions: - tcp_v4_connect - inet_csk_accept - tcp_close It generates "connect", "accept" and "close" events containing the connection tuple but also the pid and the netns. The python script is piped into the Scope Probe and probe/endpoint/ebpf.go maintains the list of connections. Similarly to conntrack, we keep the dead connections for one iteration in order to report the short-lived connections. The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still there and still used at start-up because eBPF only brings us the events and not the initial state. However, the /proc parsing for the initial state is now done in foreground instead of background, via newForegroundReader(). Scope Probe also falls back on the the old /proc parsing if eBPF is not working (e.g. too old kernel, or missing kernel headers). There is also a flag "probe.ebpf.connections" that could disable eBPF if set to false. NAT resolutions on connections from eBPF works in the same way as it did on connections from /proc: by using conntrack. The Scope Docker image is bigger because we need a few more packages for bcc: - weaveworks/scope in current master: 22 MB - weaveworks/scope with this patch: 147 MB Limitations: - [ ] Does not support IPv6 - [ ] Sets `procspied: true` on connections coming from eBPF - [ ] Size of the Docker images: 6 times bigger - [ ] Requirement on kernel headers - [ ] Location of kernel headers: https://github.com/iovisor/bcc/issues/743 Fixes https://github.com/weaveworks/scope/issues/1168 (walking /proc to obtain connections is very expensive) Fixes https://github.com/weaveworks/scope/issues/1260 (Short-lived connections not tracked for containers in shared networking namespaces) --- docker/Dockerfile | 8 +- docker/tcpv4tracer.py | 291 ++++++++++++++++++ probe/endpoint/ebpf.go | 229 ++++++++++++++ probe/endpoint/ebpf/main.go | 27 ++ probe/endpoint/four_tuple.go | 43 +++ .../procspy/background_reader_linux.go | 23 ++ probe/endpoint/procspy/spy_darwin.go | 5 + probe/endpoint/procspy/spy_linux.go | 6 + probe/endpoint/reporter.go | 89 ++++-- prog/main.go | 2 + prog/probe.go | 9 +- scope | 3 + 12 files changed, 702 insertions(+), 33 deletions(-) create mode 100755 docker/tcpv4tracer.py create mode 100644 probe/endpoint/ebpf.go create mode 100644 probe/endpoint/ebpf/main.go create mode 100644 probe/endpoint/four_tuple.go diff --git a/docker/Dockerfile b/docker/Dockerfile index 029177008b..cf7e15eb52 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,14 +1,12 @@ -FROM alpine:3.3 +FROM zlim/bcc MAINTAINER Weaveworks Inc LABEL works.weave.role=system WORKDIR /home/weave -RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/community" >>/etc/apk/repositories && \ - apk add --update bash runit conntrack-tools iproute2 util-linux curl && \ - rm -rf /var/cache/apk/* +RUN apt-get update -y && apt-get install -y bash runit conntrack iproute2 util-linux curl python bcc-tools python-bcc libbcc ADD ./docker.tgz / ADD ./demo.json / ADD ./weave /usr/bin/ -COPY ./scope ./runsvinit ./entrypoint.sh /home/weave/ +COPY ./scope ./runsvinit ./entrypoint.sh ./tcpv4tracer.py /home/weave/ COPY ./run-app /etc/service/app/run COPY ./run-probe /etc/service/probe/run EXPOSE 4040 diff --git a/docker/tcpv4tracer.py b/docker/tcpv4tracer.py new file mode 100755 index 0000000000..fc7eca36ea --- /dev/null +++ b/docker/tcpv4tracer.py @@ -0,0 +1,291 @@ +#!/usr/bin/python +# +# tcpv4tracer Trace TCP IPv4 connections. +# For Linux, uses BCC, eBPF. Embedded C. +# +# USAGE: tcpv4tracer [-h] [-p PID] +# +from __future__ import print_function +from bcc import BPF + +import argparse +import ctypes + +parser = argparse.ArgumentParser( + description="Trace TCP IPv4 connections", + formatter_class=argparse.RawDescriptionHelpFormatter) +parser.add_argument("-p", "--pid", + help="trace this PID only") +args = parser.parse_args() + +# define BPF program +bpf_text = """ +#include +#include +#include +#include +#include + +#define TCP_EVENT_TYPE_CONNECT 1 +#define TCP_EVENT_TYPE_ACCEPT 2 +#define TCP_EVENT_TYPE_CLOSE 3 + +struct tcp_event_t { + u32 type; + u32 netns; + u32 pid; + u32 saddr; + u32 daddr; + u16 sport; + u16 dport; +}; + +BPF_PERF_OUTPUT(tcp_event); +BPF_HASH(connectsock, u64, struct sock *); +BPF_HASH(closesock, u64, struct sock *); + +int kprobe__tcp_v4_connect(struct pt_regs *ctx, struct sock *sk) +{ + u64 pid = bpf_get_current_pid_tgid(); + + ##FILTER_PID## + + // stash the sock ptr for lookup on return + connectsock.update(&pid, &sk); + + return 0; +}; + +int kretprobe__tcp_v4_connect(struct pt_regs *ctx) +{ + int ret = PT_REGS_RC(ctx); + u64 pid = bpf_get_current_pid_tgid(); + + struct sock **skpp; + skpp = connectsock.lookup(&pid); + if (skpp == 0) { + return 0; // missed entry + } + + if (ret != 0) { + // failed to send SYNC packet, may not have populated + // socket __sk_common.{skc_rcv_saddr, ...} + connectsock.delete(&pid); + return 0; + } + + + // pull in details + struct sock *skp = *skpp; + struct ns_common *ns; + u32 saddr = 0, daddr = 0; + u16 sport = 0, dport = 0; + u32 net_ns_inum = 0; + bpf_probe_read(&sport, sizeof(sport), &((struct inet_sock *)skp)->inet_sport); + bpf_probe_read(&saddr, sizeof(saddr), &skp->__sk_common.skc_rcv_saddr); + bpf_probe_read(&daddr, sizeof(daddr), &skp->__sk_common.skc_daddr); + bpf_probe_read(&dport, sizeof(dport), &skp->__sk_common.skc_dport); + +// Get network namespace id, if kernel supports it +#ifdef CONFIG_NET_NS + possible_net_t skc_net = {0,}; + bpf_probe_read(&skc_net, sizeof(skc_net), &skp->__sk_common.skc_net); + bpf_probe_read(&net_ns_inum, sizeof(net_ns_inum), &skc_net.net->ns.inum); +#else + net_ns_inum = 0; +#endif + + // output + struct tcp_event_t evt = { + .type = TCP_EVENT_TYPE_CONNECT, + .pid = pid >> 32, + .saddr = saddr, + .daddr = daddr, + .sport = ntohs(sport), + .dport = ntohs(dport), + .netns = net_ns_inum, + }; + + u16 family = 0; + bpf_probe_read(&family, sizeof(family), &skp->__sk_common.skc_family); + + tcp_event.perf_submit(ctx, &evt, sizeof(evt)); + + connectsock.delete(&pid); + + return 0; +} + +int kprobe__tcp_close(struct pt_regs *ctx, struct sock *sk) +{ + u64 pid = bpf_get_current_pid_tgid(); + + ##FILTER_PID## + + // stash the sock ptr for lookup on return + closesock.update(&pid, &sk); + + return 0; +}; + +int kretprobe__tcp_close(struct pt_regs *ctx) +{ + u64 pid = bpf_get_current_pid_tgid(); + + struct sock **skpp; + skpp = closesock.lookup(&pid); + if (skpp == 0) { + return 0; // missed entry + } + + // pull in details + struct sock *skp = *skpp; + u32 saddr = 0, daddr = 0; + u16 sport = 0, dport = 0; + u32 net_ns_inum = 0; + bpf_probe_read(&saddr, sizeof(saddr), &skp->__sk_common.skc_rcv_saddr); + bpf_probe_read(&daddr, sizeof(daddr), &skp->__sk_common.skc_daddr); + bpf_probe_read(&sport, sizeof(sport), &((struct inet_sock *)skp)->inet_sport); + bpf_probe_read(&dport, sizeof(dport), &skp->__sk_common.skc_dport); + +// Get network namespace id, if kernel supports it +#ifdef CONFIG_NET_NS + possible_net_t skc_net = {0,}; + bpf_probe_read(&skc_net, sizeof(skc_net), &skp->__sk_common.skc_net); + bpf_probe_read(&net_ns_inum, sizeof(net_ns_inum), &skc_net.net->ns.inum); +#else + net_ns_inum = 0; +#endif + + // output + struct tcp_event_t evt = { + .type = TCP_EVENT_TYPE_CLOSE, + .pid = pid >> 32, + .saddr = saddr, + .daddr = daddr, + .sport = ntohs(sport), + .dport = ntohs(dport), + .netns = net_ns_inum, + }; + + u16 family = 0; + bpf_probe_read(&family, sizeof(family), &skp->__sk_common.skc_family); + + // do not send event if IP address is 0.0.0.0 or port is 0 + if (evt.saddr != 0 && evt.daddr != 0 && evt.sport != 0 && evt.dport != 0) { + tcp_event.perf_submit(ctx, &evt, sizeof(evt)); + } + + closesock.delete(&pid); + + return 0; +} + +int kretprobe__inet_csk_accept(struct pt_regs *ctx) +{ + struct sock *newsk = (struct sock *)PT_REGS_RC(ctx); + u64 pid = bpf_get_current_pid_tgid(); + + ##FILTER_PID## + + if (newsk == NULL) + return 0; + + // check this is TCP + u8 protocol = 0; + // workaround for reading the sk_protocol bitfield: + bpf_probe_read(&protocol, 1, (void *)((long)&newsk->sk_wmem_queued) - 3); + if (protocol != IPPROTO_TCP) + return 0; + + // pull in details + u16 family = 0, lport = 0, dport = 0; + u32 net_ns_inum = 0; + bpf_probe_read(&family, sizeof(family), &newsk->__sk_common.skc_family); + bpf_probe_read(&lport, sizeof(lport), &newsk->__sk_common.skc_num); + bpf_probe_read(&dport, sizeof(dport), &newsk->__sk_common.skc_dport); + +// Get network namespace id, if kernel supports it +#ifdef CONFIG_NET_NS + possible_net_t skc_net = {0,}; + bpf_probe_read(&skc_net, sizeof(skc_net), &newsk->__sk_common.skc_net); + bpf_probe_read(&net_ns_inum, sizeof(net_ns_inum), &skc_net.net->ns.inum); +#else + net_ns_inum = 0; +#endif + + if (family == AF_INET) { + struct tcp_event_t evt = { + .type = TCP_EVENT_TYPE_ACCEPT, + .pid = pid >> 32, + .netns = net_ns_inum, + }; + + bpf_probe_read(&evt.saddr, sizeof(u32), + &newsk->__sk_common.skc_rcv_saddr); + bpf_probe_read(&evt.daddr, sizeof(u32), + &newsk->__sk_common.skc_daddr); + evt.sport = lport; + evt.dport = ntohs(dport); + tcp_event.perf_submit(ctx, &evt, sizeof(evt)); + } + // else drop + + return 0; +} +""" + +class TCPEvt(ctypes.Structure): + _fields_ = [ + ("type", ctypes.c_uint), + ("netns", ctypes.c_uint), + ("pid", ctypes.c_uint), + ("saddr", ctypes.c_uint), + ("daddr", ctypes.c_uint), + ("sport", ctypes.c_ushort), + ("dport", ctypes.c_ushort), + ] + +def print_event(cpu, data, size): + event = ctypes.cast(data, ctypes.POINTER(TCPEvt)).contents + if event.type == 1: + type_str = "connect" + elif event.type == 2: + type_str = "accept" + elif event.type == 3: + type_str = "close" + else: + type_str = "unknown-" + str(event.type) + + print("%s %s %s %s %s %s %s" % (type_str, event.pid, + inet_ntoa(event.saddr), + inet_ntoa(event.daddr), + event.sport, + event.dport, + event.netns, + )) + +if args.pid: + bpf_text = bpf_text.replace('##FILTER_PID##', + 'if (pid != %s) { return 0; }' % args.pid) +else: + bpf_text = bpf_text.replace('##FILTER_PID##', '') + +# initialize BPF +b = BPF(text=bpf_text) + +# header +print("TYPE PID SADDR DADDR SPORT DPORT NETNS") + +def inet_ntoa(addr): + dq = '' + for i in range(0, 4): + dq = dq + str(addr & 0xff) + if (i != 3): + dq = dq + '.' + addr = addr >> 8 + return dq + +b["tcp_event"].open_perf_buffer(print_event) +while True: + b.kprobe_poll() diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go new file mode 100644 index 0000000000..f0d5ca7f1b --- /dev/null +++ b/probe/endpoint/ebpf.go @@ -0,0 +1,229 @@ +package endpoint + +import ( + "bufio" + "net" + "os" + "os/exec" + "strconv" + "strings" + "sync" + + log "github.com/Sirupsen/logrus" +) + +// TCPV4TracerLocation is the location of the Python script +// that delivers the eBPF messages coming from the kernel. +// The script is located inside the Docker container in which scope executes. +var TCPV4TracerLocation = "/home/weave/tcpv4tracer.py" + +// An ebpfConnection represents a TCP connection +type ebpfConnection struct { + tuple fourTuple + networkNamespace string + incoming bool + pid int +} + +type eventTracker interface { + handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) + hasDied() bool + run() + walkConnections(f func(ebpfConnection)) + initialize() + isInitialized() bool +} + +// nilTracker is a tracker that does nothing, and it implements the eventTracker interface. +// It is returned when the useEbpfConn flag is false. +type nilTracker struct{} + +func (n nilTracker) handleConnection(_ string, _ fourTuple, _ int, _ string) {} +func (n nilTracker) hasDied() bool { return true } +func (n nilTracker) run() {} +func (n nilTracker) walkConnections(f func(ebpfConnection)) {} +func (n nilTracker) initialize() {} +func (n nilTracker) isInitialized() bool { return false } + +// EbpfTracker contains the sets of open and closed TCP connections. +// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. +type EbpfTracker struct { + sync.Mutex + // the eBPF script command + cmd *exec.Cmd + + initialized bool + dead bool + + openConnections map[string]ebpfConnection + closedConnections []ebpfConnection +} + +func newEbpfTracker(useEbpfConn bool) eventTracker { + if !useEbpfConn { + return &nilTracker{} + } + cmd := exec.Command(TCPV4TracerLocation) + env := os.Environ() + cmd.Env = append(env, "PYTHONUNBUFFERED=1") + + stderr, err := cmd.StderrPipe() + if err != nil { + log.Errorf("EbpfTracker error: %v", err) + return nil + } + go logPipe("EbpfTracker stderr:", stderr) + + tracker := &EbpfTracker{ + cmd: cmd, + openConnections: map[string]ebpfConnection{}, + } + log.Info("EbpfTracker started") + go tracker.run() + return tracker +} + +func (t *EbpfTracker) handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) { + t.Lock() + defer t.Unlock() + + switch eventType { + case "connect": + conn := ebpfConnection{ + incoming: false, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "accept": + conn := ebpfConnection{ + incoming: true, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "close": + if deadConn, ok := t.openConnections[tuple.String()]; ok { + delete(t.openConnections, tuple.String()) + t.closedConnections = append(t.closedConnections, deadConn) + } else { + log.Errorf("EbpfTracker error: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) + } + } + +} + +func (t *EbpfTracker) run() { + stdout, err := t.cmd.StdoutPipe() + if err != nil { + log.Errorf("EbpfTracker error: %v", err) + return + } + + if err := t.cmd.Start(); err != nil { + log.Errorf("EbpfTracker error: %v", err) + return + } + + defer func() { + if err := t.cmd.Wait(); err != nil { + log.Errorf("EbpfTracker error: %v", err) + } + + t.Lock() + t.dead = true + t.Unlock() + }() + + reader := bufio.NewReader(stdout) + // skip first line of output table, containing the headers + if _, err := reader.ReadString('\n'); err != nil { + log.Errorf("EbpfTracker error: %v", err) + return + } + + defer log.Infof("EbpfTracker exiting") + + scn := bufio.NewScanner(reader) + for scn.Scan() { + txt := scn.Text() + line := strings.Fields(txt) + + if len(line) != 7 { + log.Errorf("error parsing line %q", txt) + continue + } + + eventType := line[0] + + pid, err := strconv.Atoi(line[1]) + if err != nil { + log.Errorf("error parsing pid %q: %v", line[1], err) + continue + } + + sourceAddr := net.ParseIP(line[2]) + if sourceAddr == nil { + log.Errorf("error parsing sourceAddr %q: %v", line[2], err) + continue + } + + destAddr := net.ParseIP(line[3]) + if destAddr == nil { + log.Errorf("error parsing destAddr %q: %v", line[3], err) + continue + } + + sPort, err := strconv.ParseUint(line[4], 10, 16) + if err != nil { + log.Errorf("error parsing sourcePort %q: %v", line[4], err) + continue + } + sourcePort := uint16(sPort) + + dPort, err := strconv.ParseUint(line[5], 10, 16) + if err != nil { + log.Errorf("error parsing destPort %q: %v", line[5], err) + continue + } + destPort := uint16(dPort) + + networkNamespace := line[6] + + tuple := fourTuple{sourceAddr.String(), destAddr.String(), sourcePort, destPort} + + t.handleConnection(eventType, tuple, pid, networkNamespace) + } +} + +// walkConnections calls f with all open connections and connections that have come and gone +// since the last call to walkConnections +func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { + t.Lock() + defer t.Unlock() + + for _, connection := range t.openConnections { + f(connection) + } + for _, connection := range t.closedConnections { + f(connection) + } + t.closedConnections = t.closedConnections[:0] +} + +func (t *EbpfTracker) hasDied() bool { + t.Lock() + defer t.Unlock() + + return t.dead +} + +func (t *EbpfTracker) initialize() { + t.initialized = true +} + +func (t *EbpfTracker) isInitialized() bool { + return t.initialized +} diff --git a/probe/endpoint/ebpf/main.go b/probe/endpoint/ebpf/main.go new file mode 100644 index 0000000000..92fcdbebd4 --- /dev/null +++ b/probe/endpoint/ebpf/main.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/weaveworks/scope/probe/endpoint" +) + +func main() { + tr := endpoint.NewEbpfTracker("/home/asymmetric/code/kinvolk/bcc/examples/tracing/tcpv4tracer.py") + + if tr == nil { + fmt.Fprintf(os.Stderr, "error creating tracker\n") + os.Exit(1) + } + + // create some http connection within these 10 seconds + time.Sleep(10 * time.Second) + + tr.WalkEvents(func(e endpoint.ConnectionEvent) { + fmt.Println(e) + }) + + fmt.Println("done") +} diff --git a/probe/endpoint/four_tuple.go b/probe/endpoint/four_tuple.go new file mode 100644 index 0000000000..ff2f80e0c2 --- /dev/null +++ b/probe/endpoint/four_tuple.go @@ -0,0 +1,43 @@ +package endpoint + +import ( + "fmt" + "sort" + "strings" +) + +// fourTuple is an (IP, port, IP, port) tuple, representing a connection +type fourTuple struct { + fromAddr, toAddr string + fromPort, toPort uint16 +} + +func (t fourTuple) String() string { + return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort) +} + +// key is a sortable direction-independent key for tuples, used to look up a +// fourTuple when you are unsure of its direction. +func (t fourTuple) key() string { + key := []string{ + fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), + fmt.Sprintf("%s:%d", t.toAddr, t.toPort), + } + sort.Strings(key) + return strings.Join(key, " ") +} + +// reverse flips the direction of the tuple +func (t *fourTuple) reverse() { + t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort +} + +// reverse flips the direction of a tuple, without side effects +func reverse(tuple fourTuple) fourTuple { + return fourTuple{ + fromAddr: tuple.toAddr, + toAddr: tuple.fromAddr, + fromPort: tuple.toPort, + toPort: tuple.fromPort, + } +} diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/background_reader_linux.go index ce4ee2a4f6..6fcfdd6934 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/background_reader_linux.go @@ -38,6 +38,28 @@ func newBackgroundReader(walker process.Walker) *backgroundReader { return br } +func newForegroundReader(walker process.Walker) *backgroundReader { + br := &backgroundReader{ + stopc: make(chan struct{}), + latestSockets: map[uint64]*Proc{}, + } + var ( + walkc = make(chan walkResult) + ticker = time.NewTicker(time.Millisecond) // fire every millisecond + pWalker = newPidWalker(walker, ticker.C, fdBlockSize) + ) + + go performWalk(pWalker, walkc) + + result := <-walkc + br.mtx.Lock() + br.latestBuf = result.buf + br.latestSockets = result.sockets + br.mtx.Unlock() + + return br +} + func (br *backgroundReader) stop() { close(br.stopc) } @@ -60,6 +82,7 @@ func performWalk(w pidWalker, c chan<- walkResult) { var ( err error result = walkResult{ + // TODO should we increase buf size? buf: bytes.NewBuffer(make([]byte, 0, 5000)), } ) diff --git a/probe/endpoint/procspy/spy_darwin.go b/probe/endpoint/procspy/spy_darwin.go index bec6f2a132..c0597e5edc 100644 --- a/probe/endpoint/procspy/spy_darwin.go +++ b/probe/endpoint/procspy/spy_darwin.go @@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner { return &darwinScanner{} } +// NewSyncConnectionScanner creates a new syncrhonous Darwin ConnectionScanner +func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner { + return &darwinScanner{} +} + type darwinScanner struct{} // Connections returns all established (TCP) connections. No need to be root diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 61f189881c..f8a3cc72e2 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -38,6 +38,12 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner { return &linuxScanner{br} } +// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner +func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner { + br := newForegroundReader(walker) + return &linuxScanner{br} +} + type linuxScanner struct { br *backgroundReader } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index f58fce91a1..8f6bf62e41 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,10 +1,7 @@ package endpoint import ( - "fmt" - "sort" "strconv" - "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -19,6 +16,7 @@ const ( Addr = "addr" // typically IPv4 Port = "port" Conntracked = "conntracked" + EBPF = "eBPF" Procspied = "procspied" ReverseDNSNames = "reverse_dns_names" SnoopedDNSNames = "snooped_dns_names" @@ -31,6 +29,7 @@ type ReporterConfig struct { SpyProcs bool UseConntrack bool WalkProc bool + UseEbpfConn bool ProcRoot string BufferSize int Scanner procspy.ConnectionScanner @@ -41,6 +40,7 @@ type ReporterConfig struct { type Reporter struct { conf ReporterConfig flowWalker flowWalker // interface + ebpfTracker eventTracker natMapper natMapper reverseResolver *reverseResolver } @@ -66,6 +66,7 @@ func NewReporter(conf ReporterConfig) *Reporter { return &Reporter{ conf: conf, flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize), + ebpfTracker: newEbpfTracker(conf.UseEbpfConn), natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")), reverseResolver: newReverseResolver(), } @@ -80,27 +81,7 @@ func (r *Reporter) Stop() { r.natMapper.stop() r.reverseResolver.stop() r.conf.Scanner.Stop() -} - -type fourTuple struct { - fromAddr, toAddr string - fromPort, toPort uint16 -} - -// key is a sortable direction-independent key for tuples, used to look up a -// fourTuple, when you are unsure of it's direction. -func (t fourTuple) key() string { - key := []string{ - fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), - fmt.Sprintf("%s:%d", t.toAddr, t.toPort), - } - sort.Strings(key) - return strings.Join(key, " ") -} - -// reverse flips the direction of the tuple -func (t *fourTuple) reverse() { - t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort + // TODO add a Stop method for ebpfTracker } // Report implements Reporter. @@ -111,6 +92,7 @@ func (r *Reporter) Report() (report.Report, error) { hostNodeID := report.MakeHostNodeID(r.conf.HostID) rpt := report.MakeReport() + seenTuples := map[string]fourTuple{} // Consult the flowWalker for short-lived connections @@ -144,6 +126,7 @@ func (r *Reporter) Report() (report.Report, error) { if r.conf.WalkProc { conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs) + defer r.procParsingSwitcher() if err != nil { return rpt, err } @@ -174,13 +157,41 @@ func (r *Reporter) Report() (report.Report, error) { // the direction. canonical, ok := seenTuples[tuple.key()] if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { - tuple.reverse() - toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo + r.feedToEbpf(tuple, true, int(conn.Proc.PID), namespaceID) + r.addConnection(&rpt, reverse(tuple), namespaceID, toNodeInfo, fromNodeInfo) + } else { + r.feedToEbpf(tuple, false, int(conn.Proc.PID), namespaceID) + r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) } - r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) + } } + // eBPF + if r.conf.UseEbpfConn && !r.ebpfTracker.hasDied() { + r.ebpfTracker.walkConnections(func(e ebpfConnection) { + fromNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + toNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + if e.pid > 0 { + fromNodeInfo[process.PID] = strconv.Itoa(e.pid) + fromNodeInfo[report.HostNodeID] = hostNodeID + } + + if e.incoming { + r.addConnection(&rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo) + } else { + r.addConnection(&rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo) + } + + }) + } + r.natMapper.applyNAT(rpt, r.conf.HostID) return rpt, nil } @@ -214,3 +225,27 @@ func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16 func newu64(i uint64) *uint64 { return &i } + +// procParsingSwitcher make sure that if eBPF tracking is enabled, +// connections coming from /proc parsing are only walked once. +func (r *Reporter) procParsingSwitcher() { + if r.conf.WalkProc && r.conf.UseEbpfConn { + r.conf.WalkProc = false + r.ebpfTracker.initialize() + } +} + +// if the eBPF tracker is enabled, feed the existing connections into it +// incoming connections correspond to "accept" events +// outgoing connections correspond to "connect" events +func (r Reporter) feedToEbpf(tuple fourTuple, incoming bool, pid int, namespaceID string) { + if r.conf.UseEbpfConn && !r.ebpfTracker.isInitialized() { + tcpEventType := "connect" + + if incoming { + tcpEventType = "accept" + } + + r.ebpfTracker.handleConnection(tcpEventType, tuple, pid, namespaceID) + } +} diff --git a/prog/main.go b/prog/main.go index 956549cea3..bbe3c52110 100644 --- a/prog/main.go +++ b/prog/main.go @@ -95,6 +95,7 @@ type probeFlags struct { spyProcs bool // Associate endpoints with processes (must be root) procEnabled bool // Produce process topology & process nodes in endpoint + useEbpfConn bool // Enable connection tracking with eBPF procRoot string dockerEnabled bool @@ -259,6 +260,7 @@ func main() { flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)") flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem") flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections") + flag.BoolVar(&flags.probe.useEbpfConn, "probe.ebpf.connections", true, "enable connection tracking with eBPF") // Docker flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes") diff --git a/prog/probe.go b/prog/probe.go index 5d638ea81f..b64df75aaf 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -143,8 +143,14 @@ func probeMain(flags probeFlags, targets []appclient.Target) { var scanner procspy.ConnectionScanner if flags.procEnabled { processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot)) - scanner = procspy.NewConnectionScanner(processCache) + processCache.Tick() p.AddTicker(processCache) + // if eBPF tracking is enabled, scan /proc synchronously, and just once + if flags.useEbpfConn { + scanner = procspy.NewSyncConnectionScanner(processCache) + } else { + scanner = procspy.NewConnectionScanner(processCache) + } p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies)) } @@ -161,6 +167,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { SpyProcs: flags.spyProcs, UseConntrack: flags.useConntrack, WalkProc: flags.procEnabled, + UseEbpfConn: flags.useEbpfConn, ProcRoot: flags.procRoot, BufferSize: flags.conntrackBufferSize, Scanner: scanner, diff --git a/scope b/scope index 4fdb077867..8eab0f8d3c 100755 --- a/scope +++ b/scope @@ -142,6 +142,9 @@ launch_command() { echo docker run --privileged -d --name=$SCOPE_CONTAINER_NAME --net=host --pid=host \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var/run/scope/plugins:/var/run/scope/plugins \ + -v /usr/src:/usr/src \ + -v /lib/modules:/lib/modules \ + -v /sys/kernel/debug:/sys/kernel/debug \ -e CHECKPOINT_DISABLE \ $WEAVESCOPE_DOCKER_ARGS $SCOPE_IMAGE --probe.docker=true }