From 020d9670e29b0fe3d75f2041820438a4513e18ca Mon Sep 17 00:00:00 2001 From: Lorenzo Manacorda Date: Fri, 28 Oct 2016 14:58:47 +0200 Subject: [PATCH] Add eBPF connection tracking without dependencies on kernel headers Based on work from Lorenzo, updated by Iago, Alban, Alessandro and Michael. This PR adds connection tracking using eBPF. This feature is not enabled by default. For now, you can enable it by launching scope with the following command: ``` sudo ./scope launch --probe.ebpf.connections=true ``` This patch allows scope to get notified of every connection event, without relying on the parsing of /proc/$pid/net/tcp{,6} and /proc/$pid/fd/*, and therefore improve performance. We vendor https://github.com/iovisor/gobpf in Scope to load the pre-compiled ebpf program and https://github.com/kinvolk/tcptracer-bpf to guess the offsets of the structures we need in the kernel. In this way we don't need a different pre-compiled ebpf object file per kernel. The Scope build fetches the pre-compiled ebpf program from https://hub.docker.com/r/kinvolk/tcptracer-bpf/ (see https://github.com/kinvolk/tcptracer-bpf). To update to a new version you can modify the EBPF_IMAGE variable in Makefile. The ebpf program uses kprobes/kretprobes on the following kernel functions: - tcp_v4_connect - tcp_v6_connect - tcp_set_state - inet_csk_accept - tcp_close It generates "connect", "accept" and "close" events containing the connection tuple but also pid and netns. Note: the IPv6 events are not supported in Scope and thus not passed on. probe/endpoint/ebpf.go maintains the list of connections. Similarly to conntrack, it also keeps the dead connections for one iteration in order to report short-lived connections. The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still there and still used at start-up because eBPF only brings us the events and not the initial state. However, the /proc parsing for the initial state is now done in foreground instead of background, via newForegroundReader(). NAT resolution on connections from eBPF works in the same way as it did on connections from /proc: by using conntrack. One of the two conntrack instances is only started to get the initial state and then it is stopped since eBPF detects short-lived connections. The Scope Docker image size comparison: - weaveworks/scope in current master: 22 MB (compressed), 68 MB (uncompressed) - weaveworks/scope with this patchset: 23 MB (compressed), 69 MB (uncompressed) Fixes #1168 (walking /proc to obtain connections is very expensive) Fixes #1260 (Short-lived connections not tracked for containers in shared networking namespaces) Fixes #1962 (Port ebpf tracker to Go) Fixes #1961 (Remove runtime kernel header dependency from ebpf tracker) --- .gitignore | 1 + Makefile | 31 ++- backend/Dockerfile | 6 +- circle.yml | 2 +- docker/Dockerfile | 1 + probe/endpoint/conntrack.go | 12 +- probe/endpoint/ebpf.go | 256 ++++++++++++++++++ probe/endpoint/ebpf_linux.go | 7 + probe/endpoint/ebpf_unsupported.go | 9 + probe/endpoint/four_tuple.go | 44 +++ probe/endpoint/nat.go | 2 +- probe/endpoint/nat_internal_test.go | 4 +- .../procspy/background_reader_linux.go | 22 ++ probe/endpoint/procspy/spy_darwin.go | 5 + probe/endpoint/procspy/spy_linux.go | 6 + probe/endpoint/reporter.go | 105 +++++-- prog/main.go | 2 + prog/probe.go | 81 +++--- scope | 1 + 19 files changed, 517 insertions(+), 80 deletions(-) create mode 100644 probe/endpoint/ebpf.go create mode 100644 probe/endpoint/ebpf_linux.go create mode 100644 probe/endpoint/ebpf_unsupported.go create mode 100644 probe/endpoint/four_tuple.go diff --git a/.gitignore b/.gitignore index 766b018a54..af4a77f92d 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ scope.tar prog/scope docker/scope docker/docker.tgz +docker/ebpf.tgz docker/weave docker/runsvinit extras/fixprobe/fixprobe diff --git a/Makefile b/Makefile index c6928a28cc..129a70807e 100644 --- a/Makefile +++ b/Makefile @@ -23,14 +23,31 @@ RM=--rm RUN_FLAGS=-ti BUILD_IN_CONTAINER=true GO_ENV=GOGC=off -GO=env $(GO_ENV) go -NO_CROSS_COMP=unset GOOS GOARCH -GO_HOST=$(NO_CROSS_COMP); $(GO) -WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV) +GO_ENV_ARM=$(GO_ENV) CC=/usr/bin/arm-linux-gnueabihf-gcc GO_BUILD_INSTALL_DEPS=-i GO_BUILD_TAGS='netgo unsafe' GO_BUILD_FLAGS=$(GO_BUILD_INSTALL_DEPS) -ldflags "-extldflags \"-static\" -X main.version=$(SCOPE_VERSION) -s -w" -tags $(GO_BUILD_TAGS) + +ifeq ($(GOOS),linux) +GO_ENV+=CGO_ENABLED=1 +endif + +ifeq ($(GOARCH),arm) +GO=env $(GO_ENV_ARM) go +# The version of go shipped on debian doesn't have some standard library +# packages for arm and when it tries to install them it fails because it +# doesn't have permission to write to /usr/lib +# Use -pkgdir if we build for arm so packages are installed in $HOME +GO_BUILD_FLAGS+=-pkgdir ~ +else +GO=env $(GO_ENV) go +endif + +NO_CROSS_COMP=unset GOOS GOARCH +GO_HOST=$(NO_CROSS_COMP); env $(GO_ENV) go +WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV) IMAGE_TAG=$(shell ./tools/image-tag) +EBPF_IMAGE=kinvolk/tcptracer-bpf:master-769adde all: $(SCOPE_EXPORT) @@ -41,7 +58,11 @@ docker/weave: curl -L git.io/weave -o docker/weave chmod u+x docker/weave -$(SCOPE_EXPORT): $(SCOPE_EXE) $(DOCKER_DISTRIB) docker/weave $(RUNSVINIT) docker/Dockerfile docker/demo.json docker/run-app docker/run-probe docker/entrypoint.sh +docker/ebpf.tgz: Makefile + $(SUDO) docker pull $(EBPF_IMAGE) + CONTAINER_ID=$(shell $(SUDO) docker run -d $(EBPF_IMAGE) /bin/false 2>/dev/null || true); $(SUDO) docker export -o docker/ebpf.tgz $${CONTAINER_ID} + +$(SCOPE_EXPORT): $(SCOPE_EXE) $(DOCKER_DISTRIB) docker/weave $(RUNSVINIT) docker/Dockerfile docker/demo.json docker/run-app docker/run-probe docker/entrypoint.sh docker/ebpf.tgz cp $(SCOPE_EXE) $(RUNSVINIT) docker/ cp $(DOCKER_DISTRIB) docker/docker.tgz $(SUDO) docker build -t $(SCOPE_IMAGE) docker/ diff --git a/backend/Dockerfile b/backend/Dockerfile index 982663b834..198cc1d420 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,6 +1,8 @@ -FROM golang:1.7.1 +FROM ubuntu:yakkety +ENV GOPATH /go +ENV PATH /go/bin:/usr/lib/go-1.7/bin:/usr/bin:/bin:/usr/sbin:/sbin RUN apt-get update && \ - apt-get install -y libpcap-dev python-requests time file shellcheck && \ + apt-get install -y libpcap-dev python-requests time file shellcheck golang-1.7 git gcc-arm-linux-gnueabihf && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN go clean -i net && \ go install -tags netgo std && \ diff --git a/circle.yml b/circle.yml index 47ed292d7c..fa1054891c 100644 --- a/circle.yml +++ b/circle.yml @@ -41,7 +41,7 @@ test: parallel: true - cd $SRCDIR; make RM= client-lint static: parallel: true - - cd $SRCDIR; rm -f prog/scope; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm make GO_BUILD_INSTALL_DEPS= RM= prog/scope; else GOOS=darwin make GO_BUILD_INSTALL_DEPS= RM= prog/scope; fi: + - cd $SRCDIR; rm -f prog/scope; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm GOOS=linux make GO_BUILD_INSTALL_DEPS= RM= prog/scope; else GOOS=darwin GOOS=linux make GO_BUILD_INSTALL_DEPS= RM= prog/scope; fi: parallel: true - cd $SRCDIR; rm -f prog/scope; make RM=: parallel: true diff --git a/docker/Dockerfile b/docker/Dockerfile index 029177008b..dadd6649f3 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -6,6 +6,7 @@ RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/community" >>/etc/apk/reposi apk add --update bash runit conntrack-tools iproute2 util-linux curl && \ rm -rf /var/cache/apk/* ADD ./docker.tgz / +ADD ./ebpf.tgz /usr/libexec/scope/ ADD ./demo.json / ADD ./weave /usr/bin/ COPY ./scope ./runsvinit ./entrypoint.sh /home/weave/ diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 3f4711ad73..4879dc26cd 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -65,14 +65,14 @@ type conntrack struct { // flowWalker is something that maintains flows, and provides an accessor // method to walk them. type flowWalker interface { - walkFlows(f func(flow)) + walkFlows(f func(flow, bool)) stop() } type nilFlowWalker struct{} -func (n nilFlowWalker) stop() {} -func (n nilFlowWalker) walkFlows(f func(flow)) {} +func (n nilFlowWalker) stop() {} +func (n nilFlowWalker) walkFlows(f func(flow, bool)) {} // conntrackWalker uses the conntrack command to track network connections and // implement flowWalker. @@ -463,14 +463,14 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { // walkFlows calls f with all active flows and flows that have come and gone // since the last call to walkFlows -func (c *conntrackWalker) walkFlows(f func(flow)) { +func (c *conntrackWalker) walkFlows(f func(flow, bool)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { - f(flow) + f(flow, true) } for _, flow := range c.bufferedFlows { - f(flow) + f(flow, false) } c.bufferedFlows = c.bufferedFlows[:0] } diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go new file mode 100644 index 0000000000..6c96b426d1 --- /dev/null +++ b/probe/endpoint/ebpf.go @@ -0,0 +1,256 @@ +package endpoint + +import ( + "bytes" + "encoding/binary" + "net" + "strconv" + "sync" + + log "github.com/Sirupsen/logrus" + bpflib "github.com/iovisor/gobpf/elf" + "github.com/kinvolk/tcptracer-bpf/pkg/byteorder" + "github.com/kinvolk/tcptracer-bpf/pkg/offsetguess" +) + +type eventType uint32 + +// These constants should be in sync with the equivalent definitions in the ebpf program. +const ( + _ eventType = iota + EventConnect + EventAccept + EventClose +) + +func (e eventType) String() string { + switch e { + case EventConnect: + return "connect" + case EventAccept: + return "accept" + case EventClose: + return "close" + default: + return "unknown" + } +} + +// tcpEvent should be in sync with the struct in the ebpf maps. +type tcpEvent struct { + // Timestamp must be the first field, the sorting depends on it + Timestamp uint64 + + CPU uint64 + Type uint32 + Pid uint32 + Comm [16]byte + SAddr uint32 + DAddr uint32 + SPort uint16 + DPort uint16 + NetNS uint32 +} + +// An ebpfConnection represents a TCP connection +type ebpfConnection struct { + tuple fourTuple + networkNamespace string + incoming bool + pid int +} + +type eventTracker interface { + handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) + hasDied() bool + run() + walkConnections(f func(ebpfConnection)) + initialize() + isInitialized() bool + stop() +} + +var ebpfTracker *EbpfTracker + +// nilTracker is a tracker that does nothing, and it implements the eventTracker interface. +// It is returned when the useEbpfConn flag is false. +type nilTracker struct{} + +func (n nilTracker) handleConnection(_ string, _ fourTuple, _ int, _ string) {} +func (n nilTracker) hasDied() bool { return true } +func (n nilTracker) run() {} +func (n nilTracker) walkConnections(f func(ebpfConnection)) {} +func (n nilTracker) initialize() {} +func (n nilTracker) isInitialized() bool { return false } +func (n nilTracker) stop() {} + +// EbpfTracker contains the sets of open and closed TCP connections. +// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. +type EbpfTracker struct { + sync.Mutex + reader *bpflib.Module + initialized bool + dead bool + + openConnections map[string]ebpfConnection + closedConnections []ebpfConnection +} + +func newEbpfTracker(useEbpfConn bool) eventTracker { + if !useEbpfConn { + return &nilTracker{} + } + + bpfObjectFile, err := findBpfObjectFile() + if err != nil { + log.Errorf("Cannot find BPF object file: %v", err) + return &nilTracker{} + } + + bpfPerfEvent := bpflib.NewModule(bpfObjectFile) + if bpfPerfEvent == nil { + return &nilTracker{} + } + err = bpfPerfEvent.Load() + if err != nil { + log.Errorf("Error loading BPF program: %v", err) + return &nilTracker{} + } + + bpfPerfEvent.EnableKprobes() + + tracker := &EbpfTracker{ + openConnections: map[string]ebpfConnection{}, + reader: bpfPerfEvent, + } + tracker.run() + + ebpfTracker = tracker + return tracker +} + +func (t *EbpfTracker) handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) { + t.Lock() + defer t.Unlock() + log.Debugf("handleConnection(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v)", + eventType, tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, networkNamespace) + + switch eventType { + case "connect": + conn := ebpfConnection{ + incoming: false, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "accept": + conn := ebpfConnection{ + incoming: true, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "close": + if deadConn, ok := t.openConnections[tuple.String()]; ok { + delete(t.openConnections, tuple.String()) + t.closedConnections = append(t.closedConnections, deadConn) + } else { + log.Errorf("EbpfTracker error: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) + } + } +} + +func tcpEventCallback(event tcpEvent) { + var alive bool + typ := eventType(event.Type) + pid := event.Pid & 0xffffffff + + saddrbuf := make([]byte, 4) + daddrbuf := make([]byte, 4) + + byteorder.Host.PutUint32(saddrbuf, uint32(event.SAddr)) + byteorder.Host.PutUint32(daddrbuf, uint32(event.DAddr)) + + sIP := net.IPv4(saddrbuf[0], saddrbuf[1], saddrbuf[2], saddrbuf[3]) + dIP := net.IPv4(daddrbuf[0], daddrbuf[1], daddrbuf[2], daddrbuf[3]) + + sport := event.SPort + dport := event.DPort + + if typ.String() == "close" || typ.String() == "unknown" { + alive = true + } else { + alive = false + } + tuple := fourTuple{sIP.String(), dIP.String(), uint16(sport), uint16(dport), alive} + + log.Debugf("tcpEventCallback(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v, cpu=%v, ts=%v)", + typ.String(), tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, event.NetNS, event.CPU, event.Timestamp) + ebpfTracker.handleConnection(typ.String(), tuple, int(pid), strconv.FormatUint(uint64(event.NetNS), 10)) +} + +// walkConnections calls f with all open connections and connections that have come and gone +// since the last call to walkConnections +func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { + t.Lock() + defer t.Unlock() + + for _, connection := range t.openConnections { + f(connection) + } + for _, connection := range t.closedConnections { + f(connection) + } + t.closedConnections = t.closedConnections[:0] +} + +func (t *EbpfTracker) run() { + if err := offsetguess.Guess(t.reader); err != nil { + log.Errorf("%v\n", err) + return + } + + channel := make(chan []byte) + + go func() { + var event tcpEvent + for { + data := <-channel + err := binary.Read(bytes.NewBuffer(data), byteorder.Host, &event) + if err != nil { + log.Errorf("Failed to decode received data: %s\n", err) + continue + } + tcpEventCallback(event) + } + }() + + pmIPv4, err := bpflib.InitPerfMap(t.reader, "tcp_event_ipv4", channel) + if err != nil { + log.Errorf("%v\n", err) + return + } + + pmIPv4.PollStart() +} + +func (t *EbpfTracker) hasDied() bool { + t.Lock() + defer t.Unlock() + + return t.dead +} + +func (t *EbpfTracker) initialize() { + t.initialized = true +} + +func (t *EbpfTracker) isInitialized() bool { + return t.initialized +} + +func (t *EbpfTracker) stop() { + // TODO: stop the go routine in run() +} diff --git a/probe/endpoint/ebpf_linux.go b/probe/endpoint/ebpf_linux.go new file mode 100644 index 0000000000..352c03ef07 --- /dev/null +++ b/probe/endpoint/ebpf_linux.go @@ -0,0 +1,7 @@ +//+build linux + +package endpoint + +func findBpfObjectFile() (string, error) { + return "/usr/libexec/scope/ebpf/ebpf.o", nil +} diff --git a/probe/endpoint/ebpf_unsupported.go b/probe/endpoint/ebpf_unsupported.go new file mode 100644 index 0000000000..011181fa96 --- /dev/null +++ b/probe/endpoint/ebpf_unsupported.go @@ -0,0 +1,9 @@ +//+build !linux + +package endpoint + +import "fmt" + +func findBpfObjectFile() (string, error) { + return "", fmt.Errorf("not supported") +} diff --git a/probe/endpoint/four_tuple.go b/probe/endpoint/four_tuple.go new file mode 100644 index 0000000000..2624ecd4b6 --- /dev/null +++ b/probe/endpoint/four_tuple.go @@ -0,0 +1,44 @@ +package endpoint + +import ( + "fmt" + "sort" + "strings" +) + +// fourTuple is an (IP, port, IP, port) tuple, representing a connection +type fourTuple struct { + fromAddr, toAddr string + fromPort, toPort uint16 + alive bool +} + +func (t fourTuple) String() string { + return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort) +} + +// key is a sortable direction-independent key for tuples, used to look up a +// fourTuple when you are unsure of its direction. +func (t fourTuple) key() string { + key := []string{ + fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), + fmt.Sprintf("%s:%d", t.toAddr, t.toPort), + } + sort.Strings(key) + return strings.Join(key, " ") +} + +// reverse flips the direction of the tuple +func (t *fourTuple) reverse() { + t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort +} + +// reverse flips the direction of a tuple, without side effects +func reverse(tuple fourTuple) fourTuple { + return fourTuple{ + fromAddr: tuple.toAddr, + toAddr: tuple.fromAddr, + fromPort: tuple.toPort, + toPort: tuple.fromPort, + } +} diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 88c9e8d696..382fe514fa 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -49,7 +49,7 @@ func toMapping(f flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - n.flowWalker.walkFlows(func(f flow) { + n.flowWalker.walkFlows(func(f flow, alive bool) { mapping := toMapping(f) realEndpointPort := strconv.Itoa(mapping.originalPort) diff --git a/probe/endpoint/nat_internal_test.go b/probe/endpoint/nat_internal_test.go index 2214be5f44..b88333c305 100644 --- a/probe/endpoint/nat_internal_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -13,9 +13,9 @@ type mockFlowWalker struct { flows []flow } -func (m *mockFlowWalker) walkFlows(f func(flow)) { +func (m *mockFlowWalker) walkFlows(f func(f flow, alive bool)) { for _, flow := range m.flows { - f(flow) + f(flow, true) } } diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/background_reader_linux.go index d618998a60..7c729df3e1 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/background_reader_linux.go @@ -38,6 +38,28 @@ func newBackgroundReader(walker process.Walker) *backgroundReader { return br } +func newForegroundReader(walker process.Walker) *backgroundReader { + br := &backgroundReader{ + stopc: make(chan struct{}), + latestSockets: map[uint64]*Proc{}, + } + var ( + walkc = make(chan walkResult) + ticker = time.NewTicker(time.Millisecond) // fire every millisecond + pWalker = newPidWalker(walker, ticker.C, fdBlockSize) + ) + + go performWalk(pWalker, walkc) + + result := <-walkc + br.mtx.Lock() + br.latestBuf = result.buf + br.latestSockets = result.sockets + br.mtx.Unlock() + + return br +} + func (br *backgroundReader) stop() { close(br.stopc) } diff --git a/probe/endpoint/procspy/spy_darwin.go b/probe/endpoint/procspy/spy_darwin.go index bec6f2a132..394407f39f 100644 --- a/probe/endpoint/procspy/spy_darwin.go +++ b/probe/endpoint/procspy/spy_darwin.go @@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner { return &darwinScanner{} } +// NewSyncConnectionScanner creates a new synchronous Darwin ConnectionScanner +func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner { + return &darwinScanner{} +} + type darwinScanner struct{} // Connections returns all established (TCP) connections. No need to be root diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 6966f852f9..cac7661cd5 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -38,6 +38,12 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner { return &linuxScanner{br} } +// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner +func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner { + br := newForegroundReader(walker) + return &linuxScanner{br} +} + type linuxScanner struct { br *backgroundReader } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index f58fce91a1..301dbf2374 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,14 +1,12 @@ package endpoint import ( - "fmt" - "sort" "strconv" - "strings" "time" "github.com/prometheus/client_golang/prometheus" + log "github.com/Sirupsen/logrus" "github.com/weaveworks/scope/probe/endpoint/procspy" "github.com/weaveworks/scope/probe/process" "github.com/weaveworks/scope/report" @@ -19,6 +17,7 @@ const ( Addr = "addr" // typically IPv4 Port = "port" Conntracked = "conntracked" + EBPF = "eBPF" Procspied = "procspied" ReverseDNSNames = "reverse_dns_names" SnoopedDNSNames = "snooped_dns_names" @@ -31,6 +30,7 @@ type ReporterConfig struct { SpyProcs bool UseConntrack bool WalkProc bool + UseEbpfConn bool ProcRoot string BufferSize int Scanner procspy.ConnectionScanner @@ -41,6 +41,7 @@ type ReporterConfig struct { type Reporter struct { conf ReporterConfig flowWalker flowWalker // interface + ebpfTracker eventTracker natMapper natMapper reverseResolver *reverseResolver } @@ -66,6 +67,7 @@ func NewReporter(conf ReporterConfig) *Reporter { return &Reporter{ conf: conf, flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize), + ebpfTracker: newEbpfTracker(conf.UseEbpfConn), natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")), reverseResolver: newReverseResolver(), } @@ -80,27 +82,7 @@ func (r *Reporter) Stop() { r.natMapper.stop() r.reverseResolver.stop() r.conf.Scanner.Stop() -} - -type fourTuple struct { - fromAddr, toAddr string - fromPort, toPort uint16 -} - -// key is a sortable direction-independent key for tuples, used to look up a -// fourTuple, when you are unsure of it's direction. -func (t fourTuple) key() string { - key := []string{ - fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), - fmt.Sprintf("%s:%d", t.toAddr, t.toPort), - } - sort.Strings(key) - return strings.Join(key, " ") -} - -// reverse flips the direction of the tuple -func (t *fourTuple) reverse() { - t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort + r.ebpfTracker.stop() } // Report implements Reporter. @@ -111,19 +93,22 @@ func (r *Reporter) Report() (report.Report, error) { hostNodeID := report.MakeHostNodeID(r.conf.HostID) rpt := report.MakeReport() + seenTuples := map[string]fourTuple{} // Consult the flowWalker for short-lived connections - { + // With eBPF, this is used only in the first round to build seenTuples for WalkProc + if r.conf.WalkProc || !r.conf.UseEbpfConn { extraNodeInfo := map[string]string{ Conntracked: "true", } - r.flowWalker.walkFlows(func(f flow) { + r.flowWalker.walkFlows(func(f flow, alive bool) { tuple := fourTuple{ f.Original.Layer3.SrcIP, f.Original.Layer3.DstIP, uint16(f.Original.Layer4.SrcPort), uint16(f.Original.Layer4.DstPort), + alive, } // Handle DNAT-ed short-lived connections. // The NAT mapper won't help since it only runs periodically, @@ -134,6 +119,7 @@ func (r *Reporter) Report() (report.Report, error) { f.Reply.Layer3.SrcIP, uint16(f.Reply.Layer4.DstPort), uint16(f.Reply.Layer4.SrcPort), + alive, } } @@ -144,6 +130,7 @@ func (r *Reporter) Report() (report.Report, error) { if r.conf.WalkProc { conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs) + defer r.procParsingSwitcher() if err != nil { return rpt, err } @@ -155,6 +142,7 @@ func (r *Reporter) Report() (report.Report, error) { conn.RemoteAddress.String(), conn.LocalPort, conn.RemotePort, + true, } toNodeInfo = map[string]string{Procspied: "true"} fromNodeInfo = map[string]string{Procspied: "true"} @@ -174,13 +162,46 @@ func (r *Reporter) Report() (report.Report, error) { // the direction. canonical, ok := seenTuples[tuple.key()] if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { - tuple.reverse() - toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo + if tuple.alive { + r.feedToEbpf(tuple, true, int(conn.Proc.PID), namespaceID) + } + r.addConnection(&rpt, reverse(tuple), namespaceID, toNodeInfo, fromNodeInfo) + } else { + if tuple.alive { + r.feedToEbpf(tuple, false, int(conn.Proc.PID), namespaceID) + } + r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) } - r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) + } } + // eBPF + if r.conf.UseEbpfConn && !r.ebpfTracker.hasDied() { + r.ebpfTracker.walkConnections(func(e ebpfConnection) { + fromNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + toNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + if e.pid > 0 { + fromNodeInfo[process.PID] = strconv.Itoa(e.pid) + fromNodeInfo[report.HostNodeID] = hostNodeID + } + log.Debugf("Report: ebpfTracker %v (%v) (%v)", e.tuple, e.pid, e.incoming) + + if e.incoming { + r.addConnection(&rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo) + } else { + r.addConnection(&rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo) + } + + }) + } + r.natMapper.applyNAT(rpt, r.conf.HostID) return rpt, nil } @@ -214,3 +235,29 @@ func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16 func newu64(i uint64) *uint64 { return &i } + +// procParsingSwitcher make sure that if eBPF tracking is enabled, +// connections coming from /proc parsing are only walked once. +func (r *Reporter) procParsingSwitcher() { + if r.conf.WalkProc && r.conf.UseEbpfConn { + r.conf.WalkProc = false + r.ebpfTracker.initialize() + + r.flowWalker.stop() + } +} + +// if the eBPF tracker is enabled, feed the existing connections into it +// incoming connections correspond to "accept" events +// outgoing connections correspond to "connect" events +func (r Reporter) feedToEbpf(tuple fourTuple, incoming bool, pid int, namespaceID string) { + if r.conf.UseEbpfConn && !r.ebpfTracker.isInitialized() { + tcpEventType := "connect" + + if incoming { + tcpEventType = "accept" + } + + r.ebpfTracker.handleConnection(tcpEventType, tuple, pid, namespaceID) + } +} diff --git a/prog/main.go b/prog/main.go index c2a46853a3..046af653b5 100644 --- a/prog/main.go +++ b/prog/main.go @@ -97,6 +97,7 @@ type probeFlags struct { spyProcs bool // Associate endpoints with processes (must be root) procEnabled bool // Produce process topology & process nodes in endpoint + useEbpfConn bool // Enable connection tracking with eBPF procRoot string dockerEnabled bool @@ -276,6 +277,7 @@ func main() { flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)") flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem") flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections") + flag.BoolVar(&flags.probe.useEbpfConn, "probe.ebpf.connections", false, "enable connection tracking with eBPF") // Docker flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes") diff --git a/prog/probe.go b/prog/probe.go index 32d835b982..d951bebfc6 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -46,25 +46,46 @@ var ( dockerEndpoint = "unix:///var/run/docker.sock" ) -func check(flags map[string]string) { - handleResponse := func(r *checkpoint.CheckResponse, err error) { - if err != nil { - log.Errorf("Error checking version: %v", err) - } else if r.Outdated { - log.Infof("Scope version %s is available; please update at %s", - r.CurrentVersion, r.CurrentDownloadURL) - } +func checkNewScopeVersion(flags probeFlags) { + checkpointFlags := map[string]string{} + if flags.kubernetesEnabled { + checkpointFlags["kubernetes_enabled"] = "true" + } + if flags.ecsEnabled { + checkpointFlags["ecs_enabled"] = "true" } - // Start background version checking - params := checkpoint.CheckParams{ - Product: "scope-probe", - Version: version, - Flags: flags, + go func() { + handleResponse := func(r *checkpoint.CheckResponse, err error) { + if err != nil { + log.Errorf("Error checking version: %v", err) + } else if r.Outdated { + log.Infof("Scope version %s is available; please update at %s", + r.CurrentVersion, r.CurrentDownloadURL) + } + } + + // Start background version checking + params := checkpoint.CheckParams{ + Product: "scope-probe", + Version: version, + Flags: checkpointFlags, + } + resp, err := checkpoint.Check(¶ms) + handleResponse(resp, err) + checkpoint.CheckInterval(¶ms, versionCheckPeriod, handleResponse) + }() +} + +func maybeExportProfileData(flags probeFlags) { + if flags.httpListen != "" { + go func() { + http.Handle("/metrics", prometheus.Handler()) + log.Infof("Profiling data being exported to %s", flags.httpListen) + log.Infof("go tool proof http://%s/debug/pprof/{profile,heap,block}", flags.httpListen) + log.Infof("Profiling endpoint %s terminated: %v", flags.httpListen, http.ListenAndServe(flags.httpListen, nil)) + }() } - resp, err := checkpoint.Check(¶ms) - handleResponse(resp, err) - checkpoint.CheckInterval(¶ms, versionCheckPeriod, handleResponse) } // Main runs the probe @@ -91,15 +112,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { hostID = hostName // TODO(pb): we should sanitize the hostname ) log.Infof("probe starting, version %s, ID %s", version, probeID) - - checkpointFlags := makeBaseCheckpointFlags() - if flags.kubernetesEnabled { - checkpointFlags["kubernetes_enabled"] = "true" - } - if flags.ecsEnabled { - checkpointFlags["ecs_enabled"] = "true" - } - go check(checkpointFlags) + checkNewScopeVersion(flags) handlerRegistry := controls.NewDefaultHandlerRegistry() clientFactory := func(hostname string, url url.URL) (appclient.AppClient, error) { @@ -148,8 +161,14 @@ func probeMain(flags probeFlags, targets []appclient.Target) { var scanner procspy.ConnectionScanner if flags.procEnabled { processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot)) - scanner = procspy.NewConnectionScanner(processCache) + processCache.Tick() p.AddTicker(processCache) + // if eBPF tracking is enabled, scan /proc synchronously, and just once + if flags.useEbpfConn { + scanner = procspy.NewSyncConnectionScanner(processCache) + } else { + scanner = procspy.NewConnectionScanner(processCache) + } p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies)) } @@ -166,6 +185,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { SpyProcs: flags.spyProcs, UseConntrack: flags.useConntrack, WalkProc: flags.procEnabled, + UseEbpfConn: flags.useEbpfConn, ProcRoot: flags.procRoot, BufferSize: flags.conntrackBufferSize, Scanner: scanner, @@ -263,14 +283,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { p.AddReporter(pluginRegistry) } - if flags.httpListen != "" { - go func() { - http.Handle("/metrics", prometheus.Handler()) - log.Infof("Profiling data being exported to %s", flags.httpListen) - log.Infof("go tool proof http://%s/debug/pprof/{profile,heap,block}", flags.httpListen) - log.Infof("Profiling endpoint %s terminated: %v", flags.httpListen, http.ListenAndServe(flags.httpListen, nil)) - }() - } + maybeExportProfileData(flags) p.Start() defer p.Stop() diff --git a/scope b/scope index 5597b50da0..1caed6b031 100755 --- a/scope +++ b/scope @@ -163,6 +163,7 @@ launch_command() { echo docker run --privileged -d --name="$SCOPE_CONTAINER_NAME" --net=host --pid=host \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var/run/scope/plugins:/var/run/scope/plugins \ + -v /sys/kernel/debug:/sys/kernel/debug \ -e CHECKPOINT_DISABLE \ $WEAVESCOPE_DOCKER_ARGS "$SCOPE_IMAGE" --probe.docker=true }