Skip to content
This repository has been archived by the owner on Apr 20, 2021. It is now read-only.

Commit

Permalink
Add eBPF connection tracking without dependencies on kernel headers
Browse files Browse the repository at this point in the history
Based on work from Lorenzo, updated by Iago, Alban, Alessandro and
Michael.

This PR adds connection tracking using eBPF. This feature is not enabled by default.
For now, you can enable it by launching scope with the following command:

```
sudo ./scope launch --probe.ebpf.connections=true
```

This patch allows scope to get notified of every connection event,
without relying on the parsing of /proc/$pid/net/tcp{,6} and
/proc/$pid/fd/*, and therefore improve performance.

We vendor https://github.com/iovisor/gobpf in Scope to load the
pre-compiled ebpf program and https://github.com/weaveworks/tcptracer-bpf
to guess the offsets of the structures we need in the kernel. In this
way we don't need a different pre-compiled ebpf object file per kernel.
The pre-compiled ebpf program is included in the vendoring of
tcptracer-bpf.

The ebpf program uses kprobes/kretprobes on the following kernel functions:
- tcp_v4_connect
- tcp_v6_connect
- tcp_set_state
- inet_csk_accept
- tcp_close

It generates "connect", "accept" and "close" events containing the
connection tuple but also pid and netns.
Note: the IPv6 events are not supported in Scope and thus not passed on.

probe/endpoint/ebpf.go maintains the list of connections. Similarly to
conntrack, it also keeps the dead connections for one iteration in order
to report short-lived connections.

The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still
there and still used at start-up because eBPF only brings us the events
and not the initial state. However, the /proc parsing for the initial
state is now done in foreground instead of background, via
newForegroundReader().

NAT resolution on connections from eBPF works in the same way as it did
on connections from /proc: by using conntrack. One of the two conntrack
instances is only started to get the initial state and then it is
stopped since eBPF detects short-lived connections.

The Scope Docker image size comparison:
- weaveworks/scope in current master:  22 MB (compressed),  68 MB
  (uncompressed)
- weaveworks/scope with this patchset: 23 MB (compressed), 69 MB
  (uncompressed)

Fixes weaveworks#1168 (walking /proc to obtain connections is very expensive)

Fixes weaveworks#1260 (Short-lived connections not tracked for containers in
shared networking namespaces)

Fixes weaveworks#1962 (Port ebpf tracker to Go)

Fixes weaveworks#1961 (Remove runtime kernel header dependency from ebpf tracker)
  • Loading branch information
iaguis authored and alban committed Mar 8, 2017
1 parent bdc3763 commit a422998
Show file tree
Hide file tree
Showing 18 changed files with 878 additions and 191 deletions.
19 changes: 15 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,24 @@ RM=--rm
RUN_FLAGS=-ti
BUILD_IN_CONTAINER=true
GO_ENV=GOGC=off
GO=env $(GO_ENV) go
NO_CROSS_COMP=unset GOOS GOARCH
GO_HOST=$(NO_CROSS_COMP); $(GO)
WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV)
GO_BUILD_INSTALL_DEPS=-i
GO_BUILD_TAGS='netgo unsafe'
GO_BUILD_FLAGS=$(GO_BUILD_INSTALL_DEPS) -ldflags "-extldflags \"-static\" -X main.version=$(SCOPE_VERSION) -s -w" -tags $(GO_BUILD_TAGS)
GOOS=$(shell go tool dist env | grep GOOS | sed -e 's/GOOS="\(.*\)"/\1/')

ifeq ($(GOOS),linux)
GO_ENV+=CGO_ENABLED=1
endif

ifeq ($(GOARCH),arm)
ARM_CC=CC=/usr/bin/arm-linux-gnueabihf-gcc
endif

GO=env $(GO_ENV) $(ARM_CC) go

NO_CROSS_COMP=unset GOOS GOARCH
GO_HOST=$(NO_CROSS_COMP); env $(GO_ENV) go
WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV)
IMAGE_TAG=$(shell ./tools/image-tag)

all: $(SCOPE_EXPORT)
Expand Down
9 changes: 6 additions & 3 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FROM golang:1.7.4
FROM ubuntu:yakkety
ENV GOPATH /go
ENV GOVERSION 1.7
ENV PATH /go/bin:/usr/lib/go-${GOVERSION}/bin:/usr/bin:/bin:/usr/sbin:/sbin
RUN apt-get update && \
apt-get install -y libpcap-dev python-requests time file shellcheck && \
apt-get install -y libpcap-dev python-requests time file shellcheck golang-${GOVERSION} git gcc-arm-linux-gnueabihf && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN go clean -i net && \
go install -tags netgo std && \
Expand All @@ -13,7 +16,7 @@ RUN go get -tags netgo \
github.com/fatih/hclfmt \
github.com/mjibson/esc \
github.com/client9/misspell/cmd/misspell && \
chmod a+wr --recursive /usr/local/go/pkg && \
chmod a+wr --recursive /usr/lib/go-${GOVERSION}/pkg && \
rm -rf /go/pkg/ /go/src/
COPY build.sh /
ENTRYPOINT ["/build.sh"]
250 changes: 250 additions & 0 deletions probe/endpoint/connection_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package endpoint

import (
"strconv"

log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
)

// connectionTrackerConfig are the config options for the endpoint tracker.
type connectionTrackerConfig struct {
HostID string
HostName string
SpyProcs bool
UseConntrack bool
WalkProc bool
UseEbpfConn bool
ProcRoot string
BufferSize int
Scanner procspy.ConnectionScanner
DNSSnooper *DNSSnooper
}

type connectionTracker struct {
conf connectionTrackerConfig
flowWalker flowWalker // Interface
ebpfTracker eventTracker
reverseResolver *reverseResolver
processCache *process.CachingWalker
}

func newConnectionTracker(conf connectionTrackerConfig) connectionTracker {
if !conf.UseEbpfConn {
// ebpf OFF, use flowWalker
return connectionTracker{
conf: conf,
flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat"),
ebpfTracker: nil,
reverseResolver: newReverseResolver(),
}
}
// When ebpf will be active by default, check if it starts correctly otherwise fallback to flowWalk
et, err := newEbpfTracker(conf.UseEbpfConn)
if err != nil {
// TODO: fallback to flowWalker, when ebpf is enabled by default
log.Errorf("Error setting up the ebpfTracker, connections will not be reported: %s", err)
noopConnectionTracker := connectionTracker{
conf: conf,
flowWalker: nil,
ebpfTracker: nil,
reverseResolver: nil,
}
return noopConnectionTracker
}

var processCache *process.CachingWalker
processCache = process.NewCachingWalker(process.NewWalker(conf.ProcRoot))
processCache.Tick()

ct := connectionTracker{
conf: conf,
flowWalker: nil,
ebpfTracker: et,
reverseResolver: newReverseResolver(),
processCache: processCache,
}
go ct.getInitialState()
return ct
}

func flowToTuple(f flow) (ft fourTuple) {
ft = fourTuple{
f.Original.Layer3.SrcIP,
f.Original.Layer3.DstIP,
uint16(f.Original.Layer4.SrcPort),
uint16(f.Original.Layer4.DstPort),
}
// Handle DNAT-ed connections in the initial state
if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP {
ft = fourTuple{
f.Reply.Layer3.DstIP,
f.Reply.Layer3.SrcIP,
uint16(f.Reply.Layer4.DstPort),
uint16(f.Reply.Layer4.SrcPort),
}
}
return ft
}

// ReportConnections calls trackers accordingly to the configuration.
// When ebpf is enabled, only performEbpfTrack() is called
func (t *connectionTracker) ReportConnections(rpt *report.Report) {
hostNodeID := report.MakeHostNodeID(t.conf.HostID)

if t.ebpfTracker != nil {
t.performEbpfTrack(rpt, hostNodeID)
return
}

// seenTuples contains information about connections seen by conntrack and it will be passed to the /proc parser
seenTuples := map[string]fourTuple{}
if t.flowWalker != nil {
t.performFlowWalk(rpt, &seenTuples)
}
// if eBPF was enabled but failed to initialize, Scanner will be nil.
// We can't recover from this, so don't walk proc in that case.
// TODO: implement fallback
if t.conf.WalkProc && t.conf.Scanner != nil {
t.performWalkProc(rpt, hostNodeID, &seenTuples)
}
}

func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples *map[string]fourTuple) {
// Consult the flowWalker for short-lived connections
extraNodeInfo := map[string]string{
Conntracked: "true",
}
t.flowWalker.walkFlows(func(f flow, alive bool) {
tuple := flowToTuple(f)
(*seenTuples)[tuple.key()] = tuple
t.addConnection(rpt, tuple, "", extraNodeInfo, extraNodeInfo)
})
}

func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID string, seenTuples *map[string]fourTuple) error {
conns, err := t.conf.Scanner.Connections(t.conf.SpyProcs)
if err != nil {
return err
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
var (
namespaceID string
tuple = fourTuple{
conn.LocalAddress.String(),
conn.RemoteAddress.String(),
conn.LocalPort,
conn.RemotePort,
}
toNodeInfo = map[string]string{Procspied: "true"}
fromNodeInfo = map[string]string{Procspied: "true"}
)
if conn.Proc.PID > 0 {
fromNodeInfo[process.PID] = strconv.FormatUint(uint64(conn.Proc.PID), 10)
fromNodeInfo[report.HostNodeID] = hostNodeID
}

if conn.Proc.NetNamespaceID > 0 {
namespaceID = strconv.FormatUint(conn.Proc.NetNamespaceID, 10)
}

// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
canonical, ok := (*seenTuples)[tuple.key()]
if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) {
tuple.reverse()
toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo
}
t.addConnection(rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo)
}
return nil
}

func (t *connectionTracker) getInitialState() {
scanner := procspy.NewSyncConnectionScanner(t.processCache)
// Run conntrack and proc parsing synchronously only once to initialize ebpfTracker
seenTuples := map[string]fourTuple{}
// Consult the flowWalker to get the initial state
if err := IsConntrackSupported(t.conf.ProcRoot); t.conf.UseConntrack && err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
}
}

conns, err := scanner.Connections(t.conf.SpyProcs)
if err != nil {
log.Errorf("Error initializing ebpfTracker while scanning /proc, continuing without initial connections: %s", err)
}
scanner.Stop()

t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID))
}

func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {
t.ebpfTracker.walkConnections(func(e ebpfConnection) {
fromNodeInfo := map[string]string{
EBPF: "true",
}
toNodeInfo := map[string]string{
EBPF: "true",
}
if e.pid > 0 {
fromNodeInfo[process.PID] = strconv.Itoa(e.pid)
fromNodeInfo[report.HostNodeID] = hostNodeID
}

if e.incoming {
t.addConnection(rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo)
} else {
t.addConnection(rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo)
}

})
return nil
}

func (t *connectionTracker) addConnection(rpt *report.Report, ft fourTuple, namespaceID string, extraFromNode, extraToNode map[string]string) {
var (
fromNode = t.makeEndpointNode(namespaceID, ft.fromAddr, ft.fromPort, extraFromNode)
toNode = t.makeEndpointNode(namespaceID, ft.toAddr, ft.toPort, extraToNode)
)
rpt.Endpoint = rpt.Endpoint.AddNode(fromNode.WithEdge(toNode.ID, report.EdgeMetadata{}))
rpt.Endpoint = rpt.Endpoint.AddNode(toNode)
}

func (t *connectionTracker) makeEndpointNode(namespaceID string, addr string, port uint16, extra map[string]string) report.Node {
portStr := strconv.Itoa(int(port))
node := report.MakeNodeWith(
report.MakeEndpointNodeID(t.conf.HostID, namespaceID, addr, portStr),
map[string]string{Addr: addr, Port: portStr})
if names := t.conf.DNSSnooper.CachedNamesForIP(addr); len(names) > 0 {
node = node.WithSet(SnoopedDNSNames, report.MakeStringSet(names...))
}
if names, err := t.reverseResolver.get(addr); err == nil && len(names) > 0 {
node = node.WithSet(ReverseDNSNames, report.MakeStringSet(names...))
}
if extra != nil {
node = node.WithLatests(extra)
}
return node
}

func (t *connectionTracker) Stop() error {
if t.ebpfTracker != nil {
t.ebpfTracker.stop()
}
if t.flowWalker != nil {
t.flowWalker.stop()
}
t.reverseResolver.stop()
return nil
}
18 changes: 9 additions & 9 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ type conntrack struct {
// flowWalker is something that maintains flows, and provides an accessor
// method to walk them.
type flowWalker interface {
walkFlows(f func(flow))
walkFlows(f func(f flow, active bool))
stop()
}

type nilFlowWalker struct{}

func (n nilFlowWalker) stop() {}
func (n nilFlowWalker) walkFlows(f func(flow)) {}
func (n nilFlowWalker) stop() {}
func (n nilFlowWalker) walkFlows(f func(flow, bool)) {}

// conntrackWalker uses the conntrack command to track network connections and
// implement flowWalker.
Expand Down Expand Up @@ -160,7 +160,7 @@ func logPipe(prefix string, reader io.Reader) {
func (c *conntrackWalker) run() {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
existingFlows, err := c.existingConnections()
existingFlows, err := existingConnections(c.args)
if err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
return
Expand Down Expand Up @@ -354,8 +354,8 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
return f, nil
}

func (c *conntrackWalker) existingConnections() ([]flow, error) {
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, c.args...)
func existingConnections(conntrackWalkerArgs []string) ([]flow, error) {
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, conntrackWalkerArgs...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
Expand Down Expand Up @@ -463,14 +463,14 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {

// walkFlows calls f with all active flows and flows that have come and gone
// since the last call to walkFlows
func (c *conntrackWalker) walkFlows(f func(flow)) {
func (c *conntrackWalker) walkFlows(f func(flow, bool)) {
c.Lock()
defer c.Unlock()
for _, flow := range c.activeFlows {
f(flow)
f(flow, true)
}
for _, flow := range c.bufferedFlows {
f(flow)
f(flow, false)
}
c.bufferedFlows = c.bufferedFlows[:0]
}
Loading

0 comments on commit a422998

Please sign in to comment.