Skip to content
This repository has been archived by the owner on Apr 20, 2021. It is now read-only.

Commit

Permalink
Add eBPF connection tracking with gobpf
Browse files Browse the repository at this point in the history
Based on work from Lorenzo, updated by Iago and Alban

This is the second attempt to add eBPF connection tracking. The first
one was via weaveworks#1967 by forking a
python script using bcc. This one is done in Golang directly thanks to
[gobpf](https://github.com/iovisor/gobpf).

This is not enabled by default. For now, it should be enabled manually
with:
```
sudo ./scope launch --probe.ebpf.connections=true
```
Scope Probe also falls back on the the old /proc parsing if eBPF is not
working (e.g. too old kernel, or missing kernel headers).

This allows scope to get notified of every connection event, without
relying on the parsing of /proc/$pid/net/tcp{,6} and /proc/$pid/fd/*,
and therefore improve performance.

The eBPF program is in probe/endpoint/ebpf.go. It was discussed in bcc
via iovisor/bcc#762.
It is using kprobes on the following kernel functions:
- tcp_v4_connect
- inet_csk_accept
- tcp_close

It generates "connect", "accept" and "close" events containing the
connection tuple but also the pid and the netns.

probe/endpoint/ebpf.go maintains the list of connections. Similarly to
conntrack, we keep the dead connections for one iteration in order to
report the short-lived connections.

The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still
there and still used at start-up because eBPF only brings us the events
and not the initial state. However, the /proc parsing for the initial
state is now done in foreground instead of background, via
newForegroundReader().

NAT resolutions on connections from eBPF works in the same way as it did
on connections from /proc: by using conntrack. One of the two conntrack
instances was removed since eBPF is able to get short-lived connections.

The Scope Docker image is bigger because we need a few more packages
for bcc:
- weaveworks/scope in current master:  22 MB (compressed),  71 MB
  (uncompressed)
- weaveworks/scope with this patchset: 83 MB (compressed), 223 MB
  (uncompressed)

But @iaguis has ongoing work to reduce the size of the image.

Limitations:
- [ ] Does not support IPv6
- [ ] Sets `procspied: true` on connections coming from eBPF
- [ ] Size of the Docker images
- [ ] Requirement on kernel headers for now
- [ ] Location of kernel headers: iovisor/bcc#743

Fixes weaveworks#1168 (walking /proc to obtain connections is very expensive)

Fixes weaveworks#1260 (Short-lived connections not tracked for containers in
shared networking namespaces)
  • Loading branch information
Lorenzo Manacorda authored and iaguis committed Dec 7, 2016
1 parent 2ced2e3 commit 243283d
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 31 deletions.
6 changes: 4 additions & 2 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
FROM golang:1.7.1
FROM ubuntu:yakkety
ENV GOPATH /go
ENV PATH /go/bin:/usr/lib/go-1.7/bin:/usr/bin:/bin:/usr/sbin:/sbin
RUN apt-get update && \
apt-get install -y libpcap-dev python-requests time file shellcheck && \
apt-get install -y libpcap-dev python-requests time file shellcheck golang-1.7 git && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN go clean -i net && \
go install -tags netgo std && \
Expand Down
237 changes: 237 additions & 0 deletions probe/endpoint/ebpf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package endpoint

import (
"bytes"
"encoding/binary"
"net"
"sync"
"unsafe"
"strconv"

log "github.com/Sirupsen/logrus"
bpflib "github.com/kinvolk/gobpf-elf-loader/bpf"
)

import "C"

var byteOrder binary.ByteOrder

type eventType uint32

// These constants should be in sync with the equivalent definitions in the ebpf program.
const (
_ eventType = iota
EventConnect
EventAccept
EventClose
)

func (e eventType) String() string {
switch e {
case EventConnect:
return "connect"
case EventAccept:
return "accept"
case EventClose:
return "close"
default:
return "unknown"
}
}

// tcpEvent should be in sync with the struct in the ebpf maps.
type tcpEvent struct {
// Timestamp must be the first field, the sorting depends on it
Timestamp uint64

CPU uint64
Type uint32
Pid uint32
Comm [16]byte
SAddr uint32
DAddr uint32
SPort uint16
DPort uint16
NetNS uint32
}

// An ebpfConnection represents a TCP connection
type ebpfConnection struct {
tuple fourTuple
networkNamespace string
incoming bool
pid int
}

type eventTracker interface {
handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string)
hasDied() bool
run()
walkConnections(f func(ebpfConnection))
initialize()
isInitialized() bool
stop()
}

var ebpfTracker *EbpfTracker

// nilTracker is a tracker that does nothing, and it implements the eventTracker interface.
// It is returned when the useEbpfConn flag is false.
type nilTracker struct{}

func (n nilTracker) handleConnection(_ string, _ fourTuple, _ int, _ string) {}
func (n nilTracker) hasDied() bool { return true }
func (n nilTracker) run() {}
func (n nilTracker) walkConnections(f func(ebpfConnection)) {}
func (n nilTracker) initialize() {}
func (n nilTracker) isInitialized() bool { return false }
func (n nilTracker) stop() {}

// EbpfTracker contains the sets of open and closed TCP connections.
// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`.
type EbpfTracker struct {
sync.Mutex
reader *bpflib.BPFKProbePerf
initialized bool
dead bool

openConnections map[string]ebpfConnection
closedConnections []ebpfConnection
}

func newEbpfTracker(useEbpfConn bool) eventTracker {
var i int32 = 0x01020304
u := unsafe.Pointer(&i)
pb := (*byte)(u)
b := *pb
if b == 0x04 {
byteOrder = binary.LittleEndian
} else {
byteOrder = binary.BigEndian
}

if !useEbpfConn {
return &nilTracker{}
}

bpfPerfEvent := bpflib.NewBpfPerfEvent("/var/run/scope/ebpf/ebpf.o")
err := bpfPerfEvent.Load()
if err != nil {
log.Errorf("Error loading BPF program: %v", err)
return &nilTracker{}
}

tracker := &EbpfTracker{
openConnections: map[string]ebpfConnection{},
reader: bpfPerfEvent,
}
go tracker.run()

ebpfTracker = tracker
return tracker
}

func (t *EbpfTracker) handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) {
t.Lock()
defer t.Unlock()

switch eventType {
case "connect":
conn := ebpfConnection{
incoming: false,
tuple: tuple,
pid: pid,
networkNamespace: networkNamespace,
}
t.openConnections[tuple.String()] = conn
case "accept":
conn := ebpfConnection{
incoming: true,
tuple: tuple,
pid: pid,
networkNamespace: networkNamespace,
}
t.openConnections[tuple.String()] = conn
case "close":
if deadConn, ok := t.openConnections[tuple.String()]; ok {
delete(t.openConnections, tuple.String())
t.closedConnections = append(t.closedConnections, deadConn)
} else {
log.Errorf("EbpfTracker error: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace)
}
}
}

func tcpEventCallback(event tcpEvent) {
typ := eventType(event.Type)
pid := event.Pid & 0xffffffff

saddrbuf := make([]byte, 4)
daddrbuf := make([]byte, 4)

binary.LittleEndian.PutUint32(saddrbuf, uint32(event.SAddr))
binary.LittleEndian.PutUint32(daddrbuf, uint32(event.DAddr))

sIP := net.IPv4(saddrbuf[0], saddrbuf[1], saddrbuf[2], saddrbuf[3])
dIP := net.IPv4(daddrbuf[0], daddrbuf[1], daddrbuf[2], daddrbuf[3])

sport := event.SPort
dport := event.DPort

tuple := fourTuple{sIP.String(), dIP.String(), uint16(sport), uint16(dport)}

ebpfTracker.handleConnection(typ.String(), tuple, int(pid), strconv.FormatUint(uint64(event.NetNS), 10))
}

// walkConnections calls f with all open connections and connections that have come and gone
// since the last call to walkConnections
func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) {
t.Lock()
defer t.Unlock()

for _, connection := range t.openConnections {
f(connection)
}
for _, connection := range t.closedConnections {
f(connection)
}
t.closedConnections = t.closedConnections[:0]
}

func (t *EbpfTracker) run() {
channel := make(chan []byte)

go func() {
var event tcpEvent
for {
data := <-channel
err := binary.Read(bytes.NewBuffer(data), byteOrder, &event)
if err != nil {
log.Errorf("failed to decode received data: %s\n", err)
continue
}
tcpEventCallback(event)
}
}()

t.reader.PollStart("tcp_event_v4", channel)
}

func (t *EbpfTracker) hasDied() bool {
t.Lock()
defer t.Unlock()

return t.dead
}

func (t *EbpfTracker) initialize() {
t.initialized = true
}

func (t *EbpfTracker) isInitialized() bool {
return t.initialized
}

func (t *EbpfTracker) stop() {
// TODO: stop the go routine in run()
}
43 changes: 43 additions & 0 deletions probe/endpoint/four_tuple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package endpoint

import (
"fmt"
"sort"
"strings"
)

// fourTuple is an (IP, port, IP, port) tuple, representing a connection
type fourTuple struct {
fromAddr, toAddr string
fromPort, toPort uint16
}

func (t fourTuple) String() string {
return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort)
}

// key is a sortable direction-independent key for tuples, used to look up a
// fourTuple when you are unsure of its direction.
func (t fourTuple) key() string {
key := []string{
fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort),
fmt.Sprintf("%s:%d", t.toAddr, t.toPort),
}
sort.Strings(key)
return strings.Join(key, " ")
}

// reverse flips the direction of the tuple
func (t *fourTuple) reverse() {
t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort
}

// reverse flips the direction of a tuple, without side effects
func reverse(tuple fourTuple) fourTuple {
return fourTuple{
fromAddr: tuple.toAddr,
toAddr: tuple.fromAddr,
fromPort: tuple.toPort,
toPort: tuple.fromPort,
}
}
22 changes: 22 additions & 0 deletions probe/endpoint/procspy/background_reader_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ func newBackgroundReader(walker process.Walker) *backgroundReader {
return br
}

func newForegroundReader(walker process.Walker) *backgroundReader {
br := &backgroundReader{
stopc: make(chan struct{}),
latestSockets: map[uint64]*Proc{},
}
var (
walkc = make(chan walkResult)
ticker = time.NewTicker(time.Millisecond) // fire every millisecond
pWalker = newPidWalker(walker, ticker.C, fdBlockSize)
)

go performWalk(pWalker, walkc)

result := <-walkc
br.mtx.Lock()
br.latestBuf = result.buf
br.latestSockets = result.sockets
br.mtx.Unlock()

return br
}

func (br *backgroundReader) stop() {
close(br.stopc)
}
Expand Down
5 changes: 5 additions & 0 deletions probe/endpoint/procspy/spy_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner {
return &darwinScanner{}
}

// NewSyncConnectionScanner creates a new synchronous Darwin ConnectionScanner
func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner {
return &darwinScanner{}
}

type darwinScanner struct{}

// Connections returns all established (TCP) connections. No need to be root
Expand Down
6 changes: 6 additions & 0 deletions probe/endpoint/procspy/spy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner {
return &linuxScanner{br}
}

// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner
func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner {
br := newForegroundReader(walker)
return &linuxScanner{br}
}

type linuxScanner struct {
br *backgroundReader
}
Expand Down
Loading

0 comments on commit 243283d

Please sign in to comment.