From cdcf4f596d656bfeb0623a6338dd8bf3ca29a04f Mon Sep 17 00:00:00 2001 From: Lorenzo Manacorda Date: Fri, 28 Oct 2016 14:58:47 +0200 Subject: [PATCH] Add eBPF connection tracking with gobpf Based on work from Lorenzo, updated by Iago and Alban This is the second attempt to add eBPF connection tracking. The first one was via https://github.com/weaveworks/scope/pull/1967 by forking a python script using bcc. This one is done in Golang directly thanks to [gobpf](https://github.com/iovisor/gobpf). This is not enabled by default. For now, it should be enabled manually with: ``` sudo ./scope launch --probe.ebpf.connections=true ``` Scope Probe also falls back on the the old /proc parsing if eBPF is not working (e.g. too old kernel, or missing kernel headers). This allows scope to get notified of every connection event, without relying on the parsing of /proc/$pid/net/tcp{,6} and /proc/$pid/fd/*, and therefore improve performance. The eBPF program is in probe/endpoint/ebpf.go. It was discussed in bcc via iovisor/bcc#762. It is using kprobes on the following kernel functions: - tcp_v4_connect - inet_csk_accept - tcp_close It generates "connect", "accept" and "close" events containing the connection tuple but also the pid and the netns. probe/endpoint/ebpf.go maintains the list of connections. Similarly to conntrack, we keep the dead connections for one iteration in order to report the short-lived connections. The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still there and still used at start-up because eBPF only brings us the events and not the initial state. However, the /proc parsing for the initial state is now done in foreground instead of background, via newForegroundReader(). NAT resolutions on connections from eBPF works in the same way as it did on connections from /proc: by using conntrack. One of the two conntrack instances was removed since eBPF is able to get short-lived connections. The Scope Docker image is bigger because we need a few more packages for bcc: - weaveworks/scope in current master: 22 MB (compressed), 71 MB (uncompressed) - weaveworks/scope with this patchset: 83 MB (compressed), 223 MB (uncompressed) But @iaguis has ongoing work to reduce the size of the image. Limitations: - [ ] Does not support IPv6 - [ ] Sets `procspied: true` on connections coming from eBPF - [ ] Size of the Docker images - [ ] Requirement on kernel headers for now - [ ] Location of kernel headers: iovisor/bcc#743 Fixes #1168 (walking /proc to obtain connections is very expensive) Fixes #1260 (Short-lived connections not tracked for containers in shared networking namespaces) --- backend/Dockerfile | 6 +- probe/endpoint/ebpf.go | 236 ++++++++++++++++++ probe/endpoint/four_tuple.go | 43 ++++ .../procspy/background_reader_linux.go | 22 ++ probe/endpoint/procspy/spy_darwin.go | 5 + probe/endpoint/procspy/spy_linux.go | 6 + probe/endpoint/reporter.go | 94 ++++--- prog/main.go | 2 + prog/probe.go | 9 +- scope | 1 + 10 files changed, 393 insertions(+), 31 deletions(-) create mode 100644 probe/endpoint/ebpf.go create mode 100644 probe/endpoint/four_tuple.go diff --git a/backend/Dockerfile b/backend/Dockerfile index 3c1c137951..f92df7acdc 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,6 +1,8 @@ -FROM golang:1.7.1 +FROM ubuntu:yakkety +ENV GOPATH /go +ENV PATH /go/bin:/usr/lib/go-1.7/bin:/usr/bin:/bin:/usr/sbin:/sbin RUN apt-get update && \ - apt-get install -y libpcap-dev python-requests time file shellcheck && \ + apt-get install -y libpcap-dev python-requests time file shellcheck golang-1.7 git && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN go clean -i net && \ go install -tags netgo std && \ diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go new file mode 100644 index 0000000000..2f02ac1c85 --- /dev/null +++ b/probe/endpoint/ebpf.go @@ -0,0 +1,236 @@ +package endpoint + +import ( + "bytes" + "encoding/binary" + "net" + "sync" + "unsafe" + "strconv" + + log "github.com/Sirupsen/logrus" + bpflib "github.com/kinvolk/gobpf-elf-loader/bpf" +) + +import "C" + +var byteOrder binary.ByteOrder + +type eventType uint32 + +// These constants should be in sync with the equivalent definitions in the ebpf program. +const ( + _ eventType = iota + EventConnect + EventAccept + EventClose +) + +func (e eventType) String() string { + switch e { + case EventConnect: + return "connect" + case EventAccept: + return "accept" + case EventClose: + return "close" + default: + return "unknown" + } +} + +// tcpEvent should be in sync with the struct in the ebpf maps. +type tcpEvent struct { + // Timestamp must be the first field, the sorting depends on it + Timestamp uint64 + + CPU uint64 + Type uint32 + Pid uint32 + Comm [16]byte + SAddr uint32 + DAddr uint32 + SPort uint16 + DPort uint16 + NetNS uint32 +} + +// An ebpfConnection represents a TCP connection +type ebpfConnection struct { + tuple fourTuple + networkNamespace string + incoming bool + pid int +} + +type eventTracker interface { + handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) + hasDied() bool + run() + walkConnections(f func(ebpfConnection)) + initialize() + isInitialized() bool + stop() +} + +var ebpfTracker *EbpfTracker + +// nilTracker is a tracker that does nothing, and it implements the eventTracker interface. +// It is returned when the useEbpfConn flag is false. +type nilTracker struct{} + +func (n nilTracker) handleConnection(_ string, _ fourTuple, _ int, _ string) {} +func (n nilTracker) hasDied() bool { return true } +func (n nilTracker) run() {} +func (n nilTracker) walkConnections(f func(ebpfConnection)) {} +func (n nilTracker) initialize() {} +func (n nilTracker) isInitialized() bool { return false } + +// EbpfTracker contains the sets of open and closed TCP connections. +// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. +type EbpfTracker struct { + sync.Mutex + reader *bpflib.BPFKProbePerf + initialized bool + dead bool + + openConnections map[string]ebpfConnection + closedConnections []ebpfConnection +} + +func newEbpfTracker(useEbpfConn bool) eventTracker { + var i int32 = 0x01020304 + u := unsafe.Pointer(&i) + pb := (*byte)(u) + b := *pb + if b == 0x04 { + byteOrder = binary.LittleEndian + } else { + byteOrder = binary.BigEndian + } + + if !useEbpfConn { + return &nilTracker{} + } + + bpfPerfEvent := bpflib.NewBpfPerfEvent("/var/run/scope/ebpf/ebpf.o") + err := bpfPerfEvent.Load() + if err != nil { + log.Errorf("Error loading BPF program: %v", err) + return &nilTracker{} + } + + tracker := &EbpfTracker{ + openConnections: map[string]ebpfConnection{}, + reader: bpfPerfEvent, + } + go tracker.run() + + ebpfTracker = tracker + return tracker +} + +func (t *EbpfTracker) handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) { + t.Lock() + defer t.Unlock() + + switch eventType { + case "connect": + conn := ebpfConnection{ + incoming: false, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "accept": + conn := ebpfConnection{ + incoming: true, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "close": + if deadConn, ok := t.openConnections[tuple.String()]; ok { + delete(t.openConnections, tuple.String()) + t.closedConnections = append(t.closedConnections, deadConn) + } else { + log.Errorf("EbpfTracker error: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) + } + } +} + +func tcpEventCallback(event tcpEvent) { + typ := eventType(event.Type) + pid := event.Pid & 0xffffffff + + saddrbuf := make([]byte, 4) + daddrbuf := make([]byte, 4) + + binary.LittleEndian.PutUint32(saddrbuf, uint32(event.SAddr)) + binary.LittleEndian.PutUint32(daddrbuf, uint32(event.DAddr)) + + sIP := net.IPv4(saddrbuf[0], saddrbuf[1], saddrbuf[2], saddrbuf[3]) + dIP := net.IPv4(daddrbuf[0], daddrbuf[1], daddrbuf[2], daddrbuf[3]) + + sport := event.SPort + dport := event.DPort + + tuple := fourTuple{sIP.String(), dIP.String(), uint16(sport), uint16(dport)} + + ebpfTracker.handleConnection(typ.String(), tuple, int(pid), strconv.FormatUint(uint64(event.NetNS), 10)) +} + +// walkConnections calls f with all open connections and connections that have come and gone +// since the last call to walkConnections +func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { + t.Lock() + defer t.Unlock() + + for _, connection := range t.openConnections { + f(connection) + } + for _, connection := range t.closedConnections { + f(connection) + } + t.closedConnections = t.closedConnections[:0] +} + +func (t *EbpfTracker) run() { + channel := make(chan []byte) + + go func() { + var event tcpEvent + for { + data := <-channel + err := binary.Read(bytes.NewBuffer(data), byteOrder, &event) + if err != nil { + log.Errorf("failed to decode received data: %s\n", err) + continue + } + tcpEventCallback(event) + } + }() + + t.reader.PollStart("tcp_event_v4", channel) +} + +func (t *EbpfTracker) hasDied() bool { + t.Lock() + defer t.Unlock() + + return t.dead +} + +func (t *EbpfTracker) initialize() { + t.initialized = true +} + +func (t *EbpfTracker) isInitialized() bool { + return t.initialized +} + +func (t *EbpfTracker) stop() { + // TODO: stop the go routine in run() +} diff --git a/probe/endpoint/four_tuple.go b/probe/endpoint/four_tuple.go new file mode 100644 index 0000000000..ff2f80e0c2 --- /dev/null +++ b/probe/endpoint/four_tuple.go @@ -0,0 +1,43 @@ +package endpoint + +import ( + "fmt" + "sort" + "strings" +) + +// fourTuple is an (IP, port, IP, port) tuple, representing a connection +type fourTuple struct { + fromAddr, toAddr string + fromPort, toPort uint16 +} + +func (t fourTuple) String() string { + return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort) +} + +// key is a sortable direction-independent key for tuples, used to look up a +// fourTuple when you are unsure of its direction. +func (t fourTuple) key() string { + key := []string{ + fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), + fmt.Sprintf("%s:%d", t.toAddr, t.toPort), + } + sort.Strings(key) + return strings.Join(key, " ") +} + +// reverse flips the direction of the tuple +func (t *fourTuple) reverse() { + t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort +} + +// reverse flips the direction of a tuple, without side effects +func reverse(tuple fourTuple) fourTuple { + return fourTuple{ + fromAddr: tuple.toAddr, + toAddr: tuple.fromAddr, + fromPort: tuple.toPort, + toPort: tuple.fromPort, + } +} diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/background_reader_linux.go index d618998a60..7c729df3e1 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/background_reader_linux.go @@ -38,6 +38,28 @@ func newBackgroundReader(walker process.Walker) *backgroundReader { return br } +func newForegroundReader(walker process.Walker) *backgroundReader { + br := &backgroundReader{ + stopc: make(chan struct{}), + latestSockets: map[uint64]*Proc{}, + } + var ( + walkc = make(chan walkResult) + ticker = time.NewTicker(time.Millisecond) // fire every millisecond + pWalker = newPidWalker(walker, ticker.C, fdBlockSize) + ) + + go performWalk(pWalker, walkc) + + result := <-walkc + br.mtx.Lock() + br.latestBuf = result.buf + br.latestSockets = result.sockets + br.mtx.Unlock() + + return br +} + func (br *backgroundReader) stop() { close(br.stopc) } diff --git a/probe/endpoint/procspy/spy_darwin.go b/probe/endpoint/procspy/spy_darwin.go index bec6f2a132..394407f39f 100644 --- a/probe/endpoint/procspy/spy_darwin.go +++ b/probe/endpoint/procspy/spy_darwin.go @@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner { return &darwinScanner{} } +// NewSyncConnectionScanner creates a new synchronous Darwin ConnectionScanner +func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner { + return &darwinScanner{} +} + type darwinScanner struct{} // Connections returns all established (TCP) connections. No need to be root diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 61f189881c..f8a3cc72e2 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -38,6 +38,12 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner { return &linuxScanner{br} } +// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner +func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner { + br := newForegroundReader(walker) + return &linuxScanner{br} +} + type linuxScanner struct { br *backgroundReader } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index f58fce91a1..b7398ef10b 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,10 +1,7 @@ package endpoint import ( - "fmt" - "sort" "strconv" - "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -19,6 +16,7 @@ const ( Addr = "addr" // typically IPv4 Port = "port" Conntracked = "conntracked" + EBPF = "eBPF" Procspied = "procspied" ReverseDNSNames = "reverse_dns_names" SnoopedDNSNames = "snooped_dns_names" @@ -31,6 +29,7 @@ type ReporterConfig struct { SpyProcs bool UseConntrack bool WalkProc bool + UseEbpfConn bool ProcRoot string BufferSize int Scanner procspy.ConnectionScanner @@ -41,6 +40,7 @@ type ReporterConfig struct { type Reporter struct { conf ReporterConfig flowWalker flowWalker // interface + ebpfTracker eventTracker natMapper natMapper reverseResolver *reverseResolver } @@ -66,6 +66,7 @@ func NewReporter(conf ReporterConfig) *Reporter { return &Reporter{ conf: conf, flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize), + ebpfTracker: newEbpfTracker(conf.UseEbpfConn), natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")), reverseResolver: newReverseResolver(), } @@ -80,27 +81,7 @@ func (r *Reporter) Stop() { r.natMapper.stop() r.reverseResolver.stop() r.conf.Scanner.Stop() -} - -type fourTuple struct { - fromAddr, toAddr string - fromPort, toPort uint16 -} - -// key is a sortable direction-independent key for tuples, used to look up a -// fourTuple, when you are unsure of it's direction. -func (t fourTuple) key() string { - key := []string{ - fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), - fmt.Sprintf("%s:%d", t.toAddr, t.toPort), - } - sort.Strings(key) - return strings.Join(key, " ") -} - -// reverse flips the direction of the tuple -func (t *fourTuple) reverse() { - t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort + r.ebpfTracker.stop() } // Report implements Reporter. @@ -111,10 +92,12 @@ func (r *Reporter) Report() (report.Report, error) { hostNodeID := report.MakeHostNodeID(r.conf.HostID) rpt := report.MakeReport() + seenTuples := map[string]fourTuple{} // Consult the flowWalker for short-lived connections - { + // With eBPF, this is used only in the first round to build seenTuples for WalkProc + if r.conf.WalkProc || !r.conf.UseEbpfConn { extraNodeInfo := map[string]string{ Conntracked: "true", } @@ -144,6 +127,7 @@ func (r *Reporter) Report() (report.Report, error) { if r.conf.WalkProc { conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs) + defer r.procParsingSwitcher() if err != nil { return rpt, err } @@ -174,13 +158,41 @@ func (r *Reporter) Report() (report.Report, error) { // the direction. canonical, ok := seenTuples[tuple.key()] if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { - tuple.reverse() - toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo + r.feedToEbpf(tuple, true, int(conn.Proc.PID), namespaceID) + r.addConnection(&rpt, reverse(tuple), namespaceID, toNodeInfo, fromNodeInfo) + } else { + r.feedToEbpf(tuple, false, int(conn.Proc.PID), namespaceID) + r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) } - r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) + } } + // eBPF + if r.conf.UseEbpfConn && !r.ebpfTracker.hasDied() { + r.ebpfTracker.walkConnections(func(e ebpfConnection) { + fromNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + toNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + if e.pid > 0 { + fromNodeInfo[process.PID] = strconv.Itoa(e.pid) + fromNodeInfo[report.HostNodeID] = hostNodeID + } + + if e.incoming { + r.addConnection(&rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo) + } else { + r.addConnection(&rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo) + } + + }) + } + r.natMapper.applyNAT(rpt, r.conf.HostID) return rpt, nil } @@ -214,3 +226,29 @@ func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16 func newu64(i uint64) *uint64 { return &i } + +// procParsingSwitcher make sure that if eBPF tracking is enabled, +// connections coming from /proc parsing are only walked once. +func (r *Reporter) procParsingSwitcher() { + if r.conf.WalkProc && r.conf.UseEbpfConn { + r.conf.WalkProc = false + r.ebpfTracker.initialize() + + r.flowWalker.stop() + } +} + +// if the eBPF tracker is enabled, feed the existing connections into it +// incoming connections correspond to "accept" events +// outgoing connections correspond to "connect" events +func (r Reporter) feedToEbpf(tuple fourTuple, incoming bool, pid int, namespaceID string) { + if r.conf.UseEbpfConn && !r.ebpfTracker.isInitialized() { + tcpEventType := "connect" + + if incoming { + tcpEventType = "accept" + } + + r.ebpfTracker.handleConnection(tcpEventType, tuple, pid, namespaceID) + } +} diff --git a/prog/main.go b/prog/main.go index 0a679970c6..9ba28add63 100644 --- a/prog/main.go +++ b/prog/main.go @@ -95,6 +95,7 @@ type probeFlags struct { spyProcs bool // Associate endpoints with processes (must be root) procEnabled bool // Produce process topology & process nodes in endpoint + useEbpfConn bool // Enable connection tracking with eBPF procRoot string dockerEnabled bool @@ -261,6 +262,7 @@ func main() { flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)") flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem") flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections") + flag.BoolVar(&flags.probe.useEbpfConn, "probe.ebpf.connections", false, "enable connection tracking with eBPF") // Docker flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes") diff --git a/prog/probe.go b/prog/probe.go index f4030ea219..4f81a09c99 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -147,8 +147,14 @@ func probeMain(flags probeFlags, targets []appclient.Target) { var scanner procspy.ConnectionScanner if flags.procEnabled { processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot)) - scanner = procspy.NewConnectionScanner(processCache) + processCache.Tick() p.AddTicker(processCache) + // if eBPF tracking is enabled, scan /proc synchronously, and just once + if flags.useEbpfConn { + scanner = procspy.NewSyncConnectionScanner(processCache) + } else { + scanner = procspy.NewConnectionScanner(processCache) + } p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies)) } @@ -165,6 +171,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { SpyProcs: flags.spyProcs, UseConntrack: flags.useConntrack, WalkProc: flags.procEnabled, + UseEbpfConn: flags.useEbpfConn, ProcRoot: flags.procRoot, BufferSize: flags.conntrackBufferSize, Scanner: scanner, diff --git a/scope b/scope index 4fdb077867..818e3c4f69 100755 --- a/scope +++ b/scope @@ -142,6 +142,7 @@ launch_command() { echo docker run --privileged -d --name=$SCOPE_CONTAINER_NAME --net=host --pid=host \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var/run/scope/plugins:/var/run/scope/plugins \ + -v /sys/kernel/debug:/sys/kernel/debug \ -e CHECKPOINT_DISABLE \ $WEAVESCOPE_DOCKER_ARGS $SCOPE_IMAGE --probe.docker=true }