From 9920c4ea480d6e2bfbaa29ba84ac1dc943ff2883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iago=20L=C3=B3pez=20Galeiras?= Date: Wed, 8 Mar 2017 15:24:37 +0100 Subject: [PATCH] Add eBPF connection tracking without dependencies on kernel headers Based on work from Lorenzo, updated by Iago, Alban, Alessandro and Michael. This PR adds connection tracking using eBPF. This feature is not enabled by default. For now, you can enable it by launching scope with the following command: ``` sudo ./scope launch --probe.ebpf.connections=true ``` This patch allows scope to get notified of every connection event, without relying on the parsing of /proc/$pid/net/tcp{,6} and /proc/$pid/fd/*, and therefore improve performance. We vendor https://github.com/iovisor/gobpf in Scope to load the pre-compiled ebpf program and https://github.com/weaveworks/tcptracer-bpf to guess the offsets of the structures we need in the kernel. In this way we don't need a different pre-compiled ebpf object file per kernel. The pre-compiled ebpf program is included in the vendoring of tcptracer-bpf. The ebpf program uses kprobes/kretprobes on the following kernel functions: - tcp_v4_connect - tcp_v6_connect - tcp_set_state - inet_csk_accept - tcp_close It generates "connect", "accept" and "close" events containing the connection tuple but also pid and netns. Note: the IPv6 events are not supported in Scope and thus not passed on. probe/endpoint/ebpf.go maintains the list of connections. Similarly to conntrack, it also keeps the dead connections for one iteration in order to report short-lived connections. The code for parsing /proc/$pid/net/tcp{,6} and /proc/$pid/fd/* is still there and still used at start-up because eBPF only brings us the events and not the initial state. However, the /proc parsing for the initial state is now done in foreground instead of background, via newForegroundReader(). NAT resolution on connections from eBPF works in the same way as it did on connections from /proc: by using conntrack. One of the two conntrack instances is only started to get the initial state and then it is stopped since eBPF detects short-lived connections. The Scope Docker image size comparison: - weaveworks/scope in current master: 22 MB (compressed), 68 MB (uncompressed) - weaveworks/scope with this patchset: 23 MB (compressed), 69 MB (uncompressed) Fixes #1168 (walking /proc to obtain connections is very expensive) Fixes #1260 (Short-lived connections not tracked for containers in shared networking namespaces) Fixes #1962 (Port ebpf tracker to Go) Fixes #1961 (Remove runtime kernel header dependency from ebpf tracker) --- Makefile | 19 +- backend/Dockerfile | 9 +- probe/endpoint/connection_tracker.go | 250 ++++++++++++++++++ probe/endpoint/conntrack.go | 18 +- probe/endpoint/ebpf.go | 217 +++++++++++++++ probe/endpoint/ebpf_test.go | 184 +++++++++++++ probe/endpoint/four_tuple.go | 45 ++++ probe/endpoint/nat.go | 2 +- probe/endpoint/nat_internal_test.go | 4 +- ...ground_reader_linux.go => reader_linux.go} | 94 +++++-- probe/endpoint/procspy/spy_darwin.go | 5 + probe/endpoint/procspy/spy_linux.go | 12 +- probe/endpoint/reporter.go | 158 ++--------- prog/main.go | 2 + prog/probe.go | 6 +- render/filters.go | 41 ++- render/process.go | 2 +- scope | 1 + 18 files changed, 878 insertions(+), 191 deletions(-) create mode 100644 probe/endpoint/connection_tracker.go create mode 100644 probe/endpoint/ebpf.go create mode 100644 probe/endpoint/ebpf_test.go create mode 100644 probe/endpoint/four_tuple.go rename probe/endpoint/procspy/{background_reader_linux.go => reader_linux.go} (77%) diff --git a/Makefile b/Makefile index c371c333b5..684f21d5f0 100644 --- a/Makefile +++ b/Makefile @@ -24,13 +24,24 @@ RM=--rm RUN_FLAGS=-ti BUILD_IN_CONTAINER=true GO_ENV=GOGC=off -GO=env $(GO_ENV) go -NO_CROSS_COMP=unset GOOS GOARCH -GO_HOST=$(NO_CROSS_COMP); $(GO) -WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV) GO_BUILD_INSTALL_DEPS=-i GO_BUILD_TAGS='netgo unsafe' GO_BUILD_FLAGS=$(GO_BUILD_INSTALL_DEPS) -ldflags "-extldflags \"-static\" -X main.version=$(SCOPE_VERSION) -s -w" -tags $(GO_BUILD_TAGS) +GOOS=$(shell go tool dist env | grep GOOS | sed -e 's/GOOS="\(.*\)"/\1/') + +ifeq ($(GOOS),linux) +GO_ENV+=CGO_ENABLED=1 +endif + +ifeq ($(GOARCH),arm) +ARM_CC=CC=/usr/bin/arm-linux-gnueabihf-gcc +endif + +GO=env $(GO_ENV) $(ARM_CC) go + +NO_CROSS_COMP=unset GOOS GOARCH +GO_HOST=$(NO_CROSS_COMP); env $(GO_ENV) go +WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV) IMAGE_TAG=$(shell ./tools/image-tag) all: $(SCOPE_EXPORT) diff --git a/backend/Dockerfile b/backend/Dockerfile index 128366fcb6..eb8aa20fdc 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,6 +1,9 @@ -FROM golang:1.7.4 +FROM ubuntu:yakkety +ENV GOPATH /go +ENV GOVERSION 1.7 +ENV PATH /go/bin:/usr/lib/go-${GOVERSION}/bin:/usr/bin:/bin:/usr/sbin:/sbin RUN apt-get update && \ - apt-get install -y libpcap-dev python-requests time file shellcheck && \ + apt-get install -y libpcap-dev python-requests time file shellcheck golang-${GOVERSION} git gcc-arm-linux-gnueabihf && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN go clean -i net && \ go install -tags netgo std && \ @@ -13,7 +16,7 @@ RUN go get -tags netgo \ github.com/fatih/hclfmt \ github.com/mjibson/esc \ github.com/client9/misspell/cmd/misspell && \ - chmod a+wr --recursive /usr/local/go/pkg && \ + chmod a+wr --recursive /usr/lib/go-${GOVERSION}/pkg && \ rm -rf /go/pkg/ /go/src/ COPY build.sh / ENTRYPOINT ["/build.sh"] diff --git a/probe/endpoint/connection_tracker.go b/probe/endpoint/connection_tracker.go new file mode 100644 index 0000000000..3479c318b1 --- /dev/null +++ b/probe/endpoint/connection_tracker.go @@ -0,0 +1,250 @@ +package endpoint + +import ( + "strconv" + + log "github.com/Sirupsen/logrus" + "github.com/weaveworks/scope/probe/endpoint/procspy" + "github.com/weaveworks/scope/probe/process" + "github.com/weaveworks/scope/report" +) + +// connectionTrackerConfig are the config options for the endpoint tracker. +type connectionTrackerConfig struct { + HostID string + HostName string + SpyProcs bool + UseConntrack bool + WalkProc bool + UseEbpfConn bool + ProcRoot string + BufferSize int + Scanner procspy.ConnectionScanner + DNSSnooper *DNSSnooper +} + +type connectionTracker struct { + conf connectionTrackerConfig + flowWalker flowWalker // Interface + ebpfTracker eventTracker + reverseResolver *reverseResolver + processCache *process.CachingWalker +} + +func newConnectionTracker(conf connectionTrackerConfig) connectionTracker { + if !conf.UseEbpfConn { + // ebpf OFF, use flowWalker + return connectionTracker{ + conf: conf, + flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat"), + ebpfTracker: nil, + reverseResolver: newReverseResolver(), + } + } + // When ebpf will be active by default, check if it starts correctly otherwise fallback to flowWalk + et, err := newEbpfTracker(conf.UseEbpfConn) + if err != nil { + // TODO: fallback to flowWalker, when ebpf is enabled by default + log.Errorf("Error setting up the ebpfTracker, connections will not be reported: %s", err) + noopConnectionTracker := connectionTracker{ + conf: conf, + flowWalker: nil, + ebpfTracker: nil, + reverseResolver: nil, + } + return noopConnectionTracker + } + + var processCache *process.CachingWalker + processCache = process.NewCachingWalker(process.NewWalker(conf.ProcRoot)) + processCache.Tick() + + ct := connectionTracker{ + conf: conf, + flowWalker: nil, + ebpfTracker: et, + reverseResolver: newReverseResolver(), + processCache: processCache, + } + go ct.getInitialState() + return ct +} + +func flowToTuple(f flow) (ft fourTuple) { + ft = fourTuple{ + f.Original.Layer3.SrcIP, + f.Original.Layer3.DstIP, + uint16(f.Original.Layer4.SrcPort), + uint16(f.Original.Layer4.DstPort), + } + // Handle DNAT-ed connections in the initial state + if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP { + ft = fourTuple{ + f.Reply.Layer3.DstIP, + f.Reply.Layer3.SrcIP, + uint16(f.Reply.Layer4.DstPort), + uint16(f.Reply.Layer4.SrcPort), + } + } + return ft +} + +// ReportConnections calls trackers accordingly to the configuration. +// When ebpf is enabled, only performEbpfTrack() is called +func (t *connectionTracker) ReportConnections(rpt *report.Report) { + hostNodeID := report.MakeHostNodeID(t.conf.HostID) + + if t.ebpfTracker != nil { + t.performEbpfTrack(rpt, hostNodeID) + return + } + + // seenTuples contains information about connections seen by conntrack and it will be passed to the /proc parser + seenTuples := map[string]fourTuple{} + if t.flowWalker != nil { + t.performFlowWalk(rpt, &seenTuples) + } + // if eBPF was enabled but failed to initialize, Scanner will be nil. + // We can't recover from this, so don't walk proc in that case. + // TODO: implement fallback + if t.conf.WalkProc && t.conf.Scanner != nil { + t.performWalkProc(rpt, hostNodeID, &seenTuples) + } +} + +func (t *connectionTracker) performFlowWalk(rpt *report.Report, seenTuples *map[string]fourTuple) { + // Consult the flowWalker for short-lived connections + extraNodeInfo := map[string]string{ + Conntracked: "true", + } + t.flowWalker.walkFlows(func(f flow, alive bool) { + tuple := flowToTuple(f) + (*seenTuples)[tuple.key()] = tuple + t.addConnection(rpt, tuple, "", extraNodeInfo, extraNodeInfo) + }) +} + +func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID string, seenTuples *map[string]fourTuple) error { + conns, err := t.conf.Scanner.Connections(t.conf.SpyProcs) + if err != nil { + return err + } + for conn := conns.Next(); conn != nil; conn = conns.Next() { + var ( + namespaceID string + tuple = fourTuple{ + conn.LocalAddress.String(), + conn.RemoteAddress.String(), + conn.LocalPort, + conn.RemotePort, + } + toNodeInfo = map[string]string{Procspied: "true"} + fromNodeInfo = map[string]string{Procspied: "true"} + ) + if conn.Proc.PID > 0 { + fromNodeInfo[process.PID] = strconv.FormatUint(uint64(conn.Proc.PID), 10) + fromNodeInfo[report.HostNodeID] = hostNodeID + } + + if conn.Proc.NetNamespaceID > 0 { + namespaceID = strconv.FormatUint(conn.Proc.NetNamespaceID, 10) + } + + // If we've already seen this connection, we should know the direction + // (or have already figured it out), so we normalize and use the + // canonical direction. Otherwise, we can use a port-heuristic to guess + // the direction. + canonical, ok := (*seenTuples)[tuple.key()] + if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { + tuple.reverse() + toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo + } + t.addConnection(rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) + } + return nil +} + +func (t *connectionTracker) getInitialState() { + scanner := procspy.NewSyncConnectionScanner(t.processCache) + // Run conntrack and proc parsing synchronously only once to initialize ebpfTracker + seenTuples := map[string]fourTuple{} + // Consult the flowWalker to get the initial state + if err := IsConntrackSupported(t.conf.ProcRoot); t.conf.UseConntrack && err != nil { + log.Warnf("Not using conntrack: not supported by the kernel: %s", err) + } else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil { + log.Errorf("conntrack existingConnections error: %v", err) + } else { + for _, f := range existingFlows { + tuple := flowToTuple(f) + seenTuples[tuple.key()] = tuple + } + } + + conns, err := scanner.Connections(t.conf.SpyProcs) + if err != nil { + log.Errorf("Error initializing ebpfTracker while scanning /proc, continuing without initial connections: %s", err) + } + scanner.Stop() + + t.ebpfTracker.feedInitialConnections(conns, seenTuples, report.MakeHostNodeID(t.conf.HostID)) +} + +func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error { + t.ebpfTracker.walkConnections(func(e ebpfConnection) { + fromNodeInfo := map[string]string{ + EBPF: "true", + } + toNodeInfo := map[string]string{ + EBPF: "true", + } + if e.pid > 0 { + fromNodeInfo[process.PID] = strconv.Itoa(e.pid) + fromNodeInfo[report.HostNodeID] = hostNodeID + } + + if e.incoming { + t.addConnection(rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo) + } else { + t.addConnection(rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo) + } + + }) + return nil +} + +func (t *connectionTracker) addConnection(rpt *report.Report, ft fourTuple, namespaceID string, extraFromNode, extraToNode map[string]string) { + var ( + fromNode = t.makeEndpointNode(namespaceID, ft.fromAddr, ft.fromPort, extraFromNode) + toNode = t.makeEndpointNode(namespaceID, ft.toAddr, ft.toPort, extraToNode) + ) + rpt.Endpoint = rpt.Endpoint.AddNode(fromNode.WithEdge(toNode.ID, report.EdgeMetadata{})) + rpt.Endpoint = rpt.Endpoint.AddNode(toNode) +} + +func (t *connectionTracker) makeEndpointNode(namespaceID string, addr string, port uint16, extra map[string]string) report.Node { + portStr := strconv.Itoa(int(port)) + node := report.MakeNodeWith( + report.MakeEndpointNodeID(t.conf.HostID, namespaceID, addr, portStr), + map[string]string{Addr: addr, Port: portStr}) + if names := t.conf.DNSSnooper.CachedNamesForIP(addr); len(names) > 0 { + node = node.WithSet(SnoopedDNSNames, report.MakeStringSet(names...)) + } + if names, err := t.reverseResolver.get(addr); err == nil && len(names) > 0 { + node = node.WithSet(ReverseDNSNames, report.MakeStringSet(names...)) + } + if extra != nil { + node = node.WithLatests(extra) + } + return node +} + +func (t *connectionTracker) Stop() error { + if t.ebpfTracker != nil { + t.ebpfTracker.stop() + } + if t.flowWalker != nil { + t.flowWalker.stop() + } + t.reverseResolver.stop() + return nil +} diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 3f4711ad73..bc79661a43 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -65,14 +65,14 @@ type conntrack struct { // flowWalker is something that maintains flows, and provides an accessor // method to walk them. type flowWalker interface { - walkFlows(f func(flow)) + walkFlows(f func(f flow, active bool)) stop() } type nilFlowWalker struct{} -func (n nilFlowWalker) stop() {} -func (n nilFlowWalker) walkFlows(f func(flow)) {} +func (n nilFlowWalker) stop() {} +func (n nilFlowWalker) walkFlows(f func(flow, bool)) {} // conntrackWalker uses the conntrack command to track network connections and // implement flowWalker. @@ -160,7 +160,7 @@ func logPipe(prefix string, reader io.Reader) { func (c *conntrackWalker) run() { // Fork another conntrack, just to capture existing connections // for which we don't get events - existingFlows, err := c.existingConnections() + existingFlows, err := existingConnections(c.args) if err != nil { log.Errorf("conntrack existingConnections error: %v", err) return @@ -354,8 +354,8 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) { return f, nil } -func (c *conntrackWalker) existingConnections() ([]flow, error) { - args := append([]string{"-L", "-o", "id", "-p", "tcp"}, c.args...) +func existingConnections(conntrackWalkerArgs []string) ([]flow, error) { + args := append([]string{"-L", "-o", "id", "-p", "tcp"}, conntrackWalkerArgs...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { @@ -463,14 +463,14 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { // walkFlows calls f with all active flows and flows that have come and gone // since the last call to walkFlows -func (c *conntrackWalker) walkFlows(f func(flow)) { +func (c *conntrackWalker) walkFlows(f func(flow, bool)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { - f(flow) + f(flow, true) } for _, flow := range c.bufferedFlows { - f(flow) + f(flow, false) } c.bufferedFlows = c.bufferedFlows[:0] } diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go new file mode 100644 index 0000000000..bed984d425 --- /dev/null +++ b/probe/endpoint/ebpf.go @@ -0,0 +1,217 @@ +package endpoint + +import ( + "errors" + "fmt" + "regexp" + "strconv" + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/weaveworks/scope/probe/endpoint/procspy" + "github.com/weaveworks/scope/probe/host" + "github.com/weaveworks/tcptracer-bpf/pkg/tracer" +) + +// An ebpfConnection represents a TCP connection +type ebpfConnection struct { + tuple fourTuple + networkNamespace string + incoming bool + pid int +} + +type eventTracker interface { + handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) + walkConnections(f func(ebpfConnection)) + feedInitialConnections(ci procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) + isReadyToHandleConnections() bool + stop() +} + +var ebpfTracker *EbpfTracker + +// EbpfTracker contains the sets of open and closed TCP connections. +// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. +type EbpfTracker struct { + sync.Mutex + tracer *tracer.Tracer + readyToHandleConnections bool + dead bool + + openConnections map[string]ebpfConnection + closedConnections []ebpfConnection +} + +var releaseRegex = regexp.MustCompile(`^(\d+)\.(\d+).*$`) + +func isKernelSupported() error { + release, _, err := host.GetKernelReleaseAndVersion() + if err != nil { + return err + } + + releaseParts := releaseRegex.FindStringSubmatch(release) + if len(releaseParts) != 3 { + return fmt.Errorf("got invalid release version %q (expected format '4.4[.2-1]')", release) + } + + major, err := strconv.Atoi(releaseParts[1]) + if err != nil { + return err + } + + minor, err := strconv.Atoi(releaseParts[2]) + if err != nil { + return err + } + + if major > 4 { + return nil + } + + if major < 4 || minor < 4 { + return fmt.Errorf("got kernel %s but need kernel >=4.4", release) + } + + return nil +} + +func newEbpfTracker(useEbpfConn bool) (eventTracker, error) { + if !useEbpfConn { + return nil, errors.New("ebpf tracker not enabled") + } + + if err := isKernelSupported(); err != nil { + return nil, fmt.Errorf("kernel not supported: %v", err) + } + + t, err := tracer.NewTracer(tcpEventCbV4, tcpEventCbV6) + if err != nil { + return nil, err + } + + tracker := &EbpfTracker{ + openConnections: map[string]ebpfConnection{}, + tracer: t, + } + + ebpfTracker = tracker + return tracker, nil +} + +var lastTimestampV4 uint64 + +func tcpEventCbV4(e tracer.TcpV4) { + if lastTimestampV4 > e.Timestamp { + log.Errorf("ERROR: late event!\n") + } + + lastTimestampV4 = e.Timestamp + + tuple := fourTuple{e.SAddr.String(), e.DAddr.String(), e.SPort, e.DPort} + ebpfTracker.handleConnection(e.Type, tuple, int(e.Pid), strconv.Itoa(int(e.NetNS))) +} + +func tcpEventCbV6(e tracer.TcpV6) { + // TODO: IPv6 not supported in Scope +} + +func (t *EbpfTracker) handleConnection(ev tracer.EventType, tuple fourTuple, pid int, networkNamespace string) { + t.Lock() + defer t.Unlock() + + if !t.isReadyToHandleConnections() { + return + } + + log.Debugf("handleConnection(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v)", + ev, tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, networkNamespace) + + switch ev { + case tracer.EventConnect: + conn := ebpfConnection{ + incoming: false, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case tracer.EventAccept: + conn := ebpfConnection{ + incoming: true, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case tracer.EventClose: + if deadConn, ok := t.openConnections[tuple.String()]; ok { + delete(t.openConnections, tuple.String()) + t.closedConnections = append(t.closedConnections, deadConn) + } else { + log.Debugf("EbpfTracker: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) + } + } +} + +// walkConnections calls f with all open connections and connections that have come and gone +// since the last call to walkConnections +func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { + t.Lock() + defer t.Unlock() + + for _, connection := range t.openConnections { + f(connection) + } + for _, connection := range t.closedConnections { + f(connection) + } + t.closedConnections = t.closedConnections[:0] +} + +func (t *EbpfTracker) feedInitialConnections(conns procspy.ConnIter, seenTuples map[string]fourTuple, hostNodeID string) { + t.readyToHandleConnections = true + for conn := conns.Next(); conn != nil; conn = conns.Next() { + var ( + namespaceID string + tuple = fourTuple{ + conn.LocalAddress.String(), + conn.RemoteAddress.String(), + conn.LocalPort, + conn.RemotePort, + } + ) + + if conn.Proc.NetNamespaceID > 0 { + namespaceID = strconv.FormatUint(conn.Proc.NetNamespaceID, 10) + } + + // We can use a port-heuristic to guess the direction. + // We assume that tuple.fromPort < tuple.toPort is a connect event (outgoing) + canonical, ok := seenTuples[tuple.key()] + if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { + t.handleConnection(tracer.EventConnect, tuple, int(conn.Proc.PID), namespaceID) + } else { + t.handleConnection(tracer.EventAccept, tuple, int(conn.Proc.PID), namespaceID) + } + } +} + +func (t *EbpfTracker) isReadyToHandleConnections() bool { + return t.readyToHandleConnections +} + +func (t *EbpfTracker) stop() { + // TODO: implement proper stopping logic + // + // Even if we stop the go routine, it's not enough since we disabled the + // async proc parser. We leave this uninmplemented for now because: + // + // * Ebpf parsing is optional (need to be enabled explicitly with + // --probe.ebpf.connections=true), if a user enables it we assume they + // check on the logs whether it works or not + // + // * It's unlikely that the ebpf tracker stops working if it started + // successfully and if it does, we probaby want it to fail hard +} diff --git a/probe/endpoint/ebpf_test.go b/probe/endpoint/ebpf_test.go new file mode 100644 index 0000000000..e813ec2e8c --- /dev/null +++ b/probe/endpoint/ebpf_test.go @@ -0,0 +1,184 @@ +package endpoint + +import ( + "net" + "reflect" + "strconv" + "testing" + + "github.com/weaveworks/tcptracer-bpf/pkg/tracer" +) + +func TestHandleConnection(t *testing.T) { + var ( + ServerPid uint32 = 42 + ClientPid uint32 = 43 + ServerIP = net.IP("127.0.0.1") + ClientIP = net.IP("127.0.0.2") + ServerPort uint16 = 12345 + ClientPort uint16 = 6789 + NetNS uint32 = 123456789 + + IPv4ConnectEvent = tracer.TcpV4{ + CPU: 0, + Type: tracer.EventConnect, + Pid: ClientPid, + Comm: "cmd", + SAddr: ClientIP, + DAddr: ServerIP, + SPort: ClientPort, + DPort: ServerPort, + NetNS: NetNS, + } + + IPv4ConnectEbpfConnection = ebpfConnection{ + tuple: fourTuple{ + fromAddr: ClientIP.String(), + toAddr: ServerIP.String(), + fromPort: ClientPort, + toPort: ServerPort, + }, + networkNamespace: strconv.Itoa(int(NetNS)), + incoming: false, + pid: int(ClientPid), + } + + IPv4ConnectCloseEvent = tracer.TcpV4{ + CPU: 0, + Type: tracer.EventClose, + Pid: ClientPid, + Comm: "cmd", + SAddr: ClientIP, + DAddr: ServerIP, + SPort: ClientPort, + DPort: ServerPort, + NetNS: NetNS, + } + + IPv4AcceptEvent = tracer.TcpV4{ + CPU: 0, + Type: tracer.EventAccept, + Pid: ServerPid, + Comm: "cmd", + SAddr: ServerIP, + DAddr: ClientIP, + SPort: ServerPort, + DPort: ClientPort, + NetNS: NetNS, + } + + IPv4AcceptEbpfConnection = ebpfConnection{ + tuple: fourTuple{ + fromAddr: ServerIP.String(), + toAddr: ClientIP.String(), + fromPort: ServerPort, + toPort: ClientPort, + }, + networkNamespace: strconv.Itoa(int(NetNS)), + incoming: true, + pid: int(ServerPid), + } + + IPv4AcceptCloseEvent = tracer.TcpV4{ + CPU: 0, + Type: tracer.EventClose, + Pid: ClientPid, + Comm: "cmd", + SAddr: ServerIP, + DAddr: ClientIP, + SPort: ServerPort, + DPort: ClientPort, + NetNS: NetNS, + } + ) + + mockEbpfTracker := &EbpfTracker{ + readyToHandleConnections: true, + dead: false, + + openConnections: map[string]ebpfConnection{}, + closedConnections: []ebpfConnection{}, + } + + tuple := fourTuple{IPv4ConnectEvent.SAddr.String(), IPv4ConnectEvent.DAddr.String(), uint16(IPv4ConnectEvent.SPort), uint16(IPv4ConnectEvent.DPort)} + mockEbpfTracker.handleConnection(IPv4ConnectEvent.Type, tuple, int(IPv4ConnectEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectEvent.NetNS), 10)) + if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4ConnectEbpfConnection) { + t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v", + IPv4ConnectEbpfConnection, mockEbpfTracker.openConnections[tuple.String()]) + } + + tuple = fourTuple{IPv4ConnectCloseEvent.SAddr.String(), IPv4ConnectCloseEvent.DAddr.String(), uint16(IPv4ConnectCloseEvent.SPort), uint16(IPv4ConnectCloseEvent.DPort)} + mockEbpfTracker.handleConnection(IPv4ConnectCloseEvent.Type, tuple, int(IPv4ConnectCloseEvent.Pid), strconv.FormatUint(uint64(IPv4ConnectCloseEvent.NetNS), 10)) + if len(mockEbpfTracker.openConnections) != 0 { + t.Errorf("Connection mismatch close event\nConnection to close:%v", + mockEbpfTracker.openConnections[tuple.String()]) + } + + mockEbpfTracker = &EbpfTracker{ + readyToHandleConnections: true, + dead: false, + + openConnections: map[string]ebpfConnection{}, + closedConnections: []ebpfConnection{}, + } + + tuple = fourTuple{IPv4AcceptEvent.SAddr.String(), IPv4AcceptEvent.DAddr.String(), uint16(IPv4AcceptEvent.SPort), uint16(IPv4AcceptEvent.DPort)} + mockEbpfTracker.handleConnection(IPv4AcceptEvent.Type, tuple, int(IPv4AcceptEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptEvent.NetNS), 10)) + if !reflect.DeepEqual(mockEbpfTracker.openConnections[tuple.String()], IPv4AcceptEbpfConnection) { + t.Errorf("Connection mismatch connect event\nTarget connection:%v\nParsed connection:%v", + IPv4AcceptEbpfConnection, mockEbpfTracker.openConnections[tuple.String()]) + } + + tuple = fourTuple{IPv4AcceptCloseEvent.SAddr.String(), IPv4AcceptCloseEvent.DAddr.String(), uint16(IPv4AcceptCloseEvent.SPort), uint16(IPv4AcceptCloseEvent.DPort)} + mockEbpfTracker.handleConnection(IPv4AcceptCloseEvent.Type, tuple, int(IPv4AcceptCloseEvent.Pid), strconv.FormatUint(uint64(IPv4AcceptCloseEvent.NetNS), 10)) + + if len(mockEbpfTracker.openConnections) != 0 { + t.Errorf("Connection mismatch close event\nConnection to close:%v", + mockEbpfTracker.openConnections) + } +} + +func TestWalkConnections(t *testing.T) { + var ( + cnt int + activeTuple = fourTuple{ + fromAddr: "", + toAddr: "", + fromPort: 0, + toPort: 0, + } + + inactiveTuple = fourTuple{ + fromAddr: "", + toAddr: "", + fromPort: 0, + toPort: 0, + } + ) + mockEbpfTracker := &EbpfTracker{ + readyToHandleConnections: true, + dead: false, + openConnections: map[string]ebpfConnection{ + activeTuple.String(): { + tuple: activeTuple, + networkNamespace: "12345", + incoming: true, + pid: 0, + }, + }, + closedConnections: []ebpfConnection{ + { + tuple: inactiveTuple, + networkNamespace: "12345", + incoming: false, + pid: 0, + }, + }, + } + mockEbpfTracker.walkConnections(func(e ebpfConnection) { + cnt++ + }) + if cnt != 2 { + t.Errorf("walkConnetions found %v instead of 2 connections", cnt) + } +} diff --git a/probe/endpoint/four_tuple.go b/probe/endpoint/four_tuple.go new file mode 100644 index 0000000000..d322d1ddc2 --- /dev/null +++ b/probe/endpoint/four_tuple.go @@ -0,0 +1,45 @@ +package endpoint + +import ( + "fmt" + "sort" + "strings" +) + +// fourTuple is an (IP, port, IP, port) tuple, representing a connection +// active tells whether the connection belongs to an activeFlow (see +// conntrack.go) +type fourTuple struct { + fromAddr, toAddr string + fromPort, toPort uint16 +} + +func (t fourTuple) String() string { + return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort) +} + +// key is a sortable direction-independent key for tuples, used to look up a +// fourTuple when you are unsure of its direction. +func (t fourTuple) key() string { + key := []string{ + fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), + fmt.Sprintf("%s:%d", t.toAddr, t.toPort), + } + sort.Strings(key) + return strings.Join(key, " ") +} + +// reverse flips the direction of the tuple +func (t *fourTuple) reverse() { + t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort +} + +// reverse flips the direction of a tuple, without side effects +func reverse(tuple fourTuple) fourTuple { + return fourTuple{ + fromAddr: tuple.toAddr, + toAddr: tuple.fromAddr, + fromPort: tuple.toPort, + toPort: tuple.fromPort, + } +} diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 88c9e8d696..489de36dad 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -49,7 +49,7 @@ func toMapping(f flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - n.flowWalker.walkFlows(func(f flow) { + n.flowWalker.walkFlows(func(f flow, active bool) { mapping := toMapping(f) realEndpointPort := strconv.Itoa(mapping.originalPort) diff --git a/probe/endpoint/nat_internal_test.go b/probe/endpoint/nat_internal_test.go index 2214be5f44..efedcf1858 100644 --- a/probe/endpoint/nat_internal_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -13,9 +13,9 @@ type mockFlowWalker struct { flows []flow } -func (m *mockFlowWalker) walkFlows(f func(flow)) { +func (m *mockFlowWalker) walkFlows(f func(f flow, active bool)) { for _, flow := range m.flows { - f(flow) + f(flow, true) } } diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/reader_linux.go similarity index 77% rename from probe/endpoint/procspy/background_reader_linux.go rename to probe/endpoint/procspy/reader_linux.go index d618998a60..359ca08b5e 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/reader_linux.go @@ -20,6 +20,11 @@ const ( targetWalkTime = 10 * time.Second // Aim at walking all files in 10 seconds ) +type reader interface { + getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) + stop() +} + type backgroundReader struct { stopc chan struct{} mtx sync.Mutex @@ -29,7 +34,7 @@ type backgroundReader struct { // starts a rate-limited background goroutine to read the expensive files from // proc. -func newBackgroundReader(walker process.Walker) *backgroundReader { +func newBackgroundReader(walker process.Walker) reader { br := &backgroundReader{ stopc: make(chan struct{}), latestSockets: map[uint64]*Proc{}, @@ -54,28 +59,6 @@ func (br *backgroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Pro return br.latestSockets, err } -type walkResult struct { - buf *bytes.Buffer - sockets map[uint64]*Proc -} - -func performWalk(w pidWalker, c chan<- walkResult) { - var ( - err error - result = walkResult{ - buf: bytes.NewBuffer(make([]byte, 0, 5000)), - } - ) - - result.sockets, err = w.walk(result.buf) - if err != nil { - log.Errorf("background /proc reader: error walking /proc: %s", err) - result.buf.Reset() - result.sockets = nil - } - c <- result -} - func (br *backgroundReader) loop(walker process.Walker) { var ( begin time.Time // when we started the last performWalk @@ -120,6 +103,71 @@ func (br *backgroundReader) loop(walker process.Walker) { } } +type foregroundReader struct { + stopc chan struct{} + latestBuf *bytes.Buffer + latestSockets map[uint64]*Proc + ticker *time.Ticker +} + +// reads synchronously files from /proc +func newForegroundReader(walker process.Walker) reader { + fr := &foregroundReader{ + stopc: make(chan struct{}), + latestSockets: map[uint64]*Proc{}, + } + var ( + walkc = make(chan walkResult) + ticker = time.NewTicker(time.Millisecond) // fire every millisecond + pWalker = newPidWalker(walker, ticker.C, fdBlockSize) + ) + + go performWalk(pWalker, walkc) + + result := <-walkc + fr.latestBuf = result.buf + fr.latestSockets = result.sockets + fr.ticker = ticker + + return fr +} + +func (fr *foregroundReader) stop() { + fr.ticker.Stop() + close(fr.stopc) +} + +func (fr *foregroundReader) getWalkedProcPid(buf *bytes.Buffer) (map[uint64]*Proc, error) { + // Don't access latestBuf directly but create a reader. In this way, + // the buffer will not be empty in the next call of getWalkedProcPid + // and it can be copied again. + _, err := io.Copy(buf, bytes.NewReader(fr.latestBuf.Bytes())) + + return fr.latestSockets, err +} + +type walkResult struct { + buf *bytes.Buffer + sockets map[uint64]*Proc +} + +func performWalk(w pidWalker, c chan<- walkResult) { + var ( + err error + result = walkResult{ + buf: bytes.NewBuffer(make([]byte, 0, 5000)), + } + ) + + result.sockets, err = w.walk(result.buf) + if err != nil { + log.Errorf("background /proc reader: error walking /proc: %s", err) + result.buf.Reset() + result.sockets = nil + } + c <- result +} + // Adjust rate limit for next walk and calculate when it should be started func scheduleNextWalk(rateLimitPeriod time.Duration, took time.Duration) (newRateLimitPeriod time.Duration, restInterval time.Duration) { log.Debugf("background /proc reader: full pass took %s", took) diff --git a/probe/endpoint/procspy/spy_darwin.go b/probe/endpoint/procspy/spy_darwin.go index bec6f2a132..394407f39f 100644 --- a/probe/endpoint/procspy/spy_darwin.go +++ b/probe/endpoint/procspy/spy_darwin.go @@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner { return &darwinScanner{} } +// NewSyncConnectionScanner creates a new synchronous Darwin ConnectionScanner +func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner { + return &darwinScanner{} +} + type darwinScanner struct{} // Connections returns all established (TCP) connections. No need to be root diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 6966f852f9..ec668a47d7 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -38,8 +38,14 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner { return &linuxScanner{br} } +// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner +func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner { + fr := newForegroundReader(walker) + return &linuxScanner{fr} +} + type linuxScanner struct { - br *backgroundReader + r reader } func (s *linuxScanner) Connections(processes bool) (ConnIter, error) { @@ -50,7 +56,7 @@ func (s *linuxScanner) Connections(processes bool) (ConnIter, error) { var procs map[uint64]*Proc if processes { var err error - if procs, err = s.br.getWalkedProcPid(buf); err != nil { + if procs, err = s.r.getWalkedProcPid(buf); err != nil { return nil, err } } @@ -68,5 +74,5 @@ func (s *linuxScanner) Connections(processes bool) (ConnIter, error) { } func (s *linuxScanner) Stop() { - s.br.stop() + s.r.stop() } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index f58fce91a1..7f0301be37 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,16 +1,10 @@ package endpoint import ( - "fmt" - "sort" - "strconv" - "strings" "time" "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/scope/probe/endpoint/procspy" - "github.com/weaveworks/scope/probe/process" "github.com/weaveworks/scope/report" ) @@ -19,6 +13,7 @@ const ( Addr = "addr" // typically IPv4 Port = "port" Conntracked = "conntracked" + EBPF = "eBPF" Procspied = "procspied" ReverseDNSNames = "reverse_dns_names" SnoopedDNSNames = "snooped_dns_names" @@ -31,6 +26,7 @@ type ReporterConfig struct { SpyProcs bool UseConntrack bool WalkProc bool + UseEbpfConn bool ProcRoot string BufferSize int Scanner procspy.ConnectionScanner @@ -39,10 +35,9 @@ type ReporterConfig struct { // Reporter generates Reports containing the Endpoint topology. type Reporter struct { - conf ReporterConfig - flowWalker flowWalker // interface - natMapper natMapper - reverseResolver *reverseResolver + conf ReporterConfig + connectionTracker connectionTracker + natMapper natMapper } // SpyDuration is an exported prometheus metric @@ -64,10 +59,20 @@ var SpyDuration = prometheus.NewSummaryVec( // with process (PID) information. func NewReporter(conf ReporterConfig) *Reporter { return &Reporter{ - conf: conf, - flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize), - natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")), - reverseResolver: newReverseResolver(), + conf: conf, + connectionTracker: newConnectionTracker(connectionTrackerConfig{ + HostID: conf.HostID, + HostName: conf.HostName, + SpyProcs: conf.SpyProcs, + UseConntrack: conf.UseConntrack, + WalkProc: conf.WalkProc, + UseEbpfConn: conf.UseEbpfConn, + ProcRoot: conf.ProcRoot, + BufferSize: conf.BufferSize, + Scanner: conf.Scanner, + DNSSnooper: conf.DNSSnooper, + }), + natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")), } } @@ -76,141 +81,20 @@ func (Reporter) Name() string { return "Endpoint" } // Stop stop stop func (r *Reporter) Stop() { - r.flowWalker.stop() + r.connectionTracker.Stop() r.natMapper.stop() - r.reverseResolver.stop() r.conf.Scanner.Stop() } -type fourTuple struct { - fromAddr, toAddr string - fromPort, toPort uint16 -} - -// key is a sortable direction-independent key for tuples, used to look up a -// fourTuple, when you are unsure of it's direction. -func (t fourTuple) key() string { - key := []string{ - fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), - fmt.Sprintf("%s:%d", t.toAddr, t.toPort), - } - sort.Strings(key) - return strings.Join(key, " ") -} - -// reverse flips the direction of the tuple -func (t *fourTuple) reverse() { - t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort -} - // Report implements Reporter. func (r *Reporter) Report() (report.Report, error) { defer func(begin time.Time) { SpyDuration.WithLabelValues().Observe(time.Since(begin).Seconds()) }(time.Now()) - hostNodeID := report.MakeHostNodeID(r.conf.HostID) rpt := report.MakeReport() - seenTuples := map[string]fourTuple{} - - // Consult the flowWalker for short-lived connections - { - extraNodeInfo := map[string]string{ - Conntracked: "true", - } - r.flowWalker.walkFlows(func(f flow) { - tuple := fourTuple{ - f.Original.Layer3.SrcIP, - f.Original.Layer3.DstIP, - uint16(f.Original.Layer4.SrcPort), - uint16(f.Original.Layer4.DstPort), - } - // Handle DNAT-ed short-lived connections. - // The NAT mapper won't help since it only runs periodically, - // missing the short-lived connections. - if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP { - tuple = fourTuple{ - f.Reply.Layer3.DstIP, - f.Reply.Layer3.SrcIP, - uint16(f.Reply.Layer4.DstPort), - uint16(f.Reply.Layer4.SrcPort), - } - } - - seenTuples[tuple.key()] = tuple - r.addConnection(&rpt, tuple, "", extraNodeInfo, extraNodeInfo) - }) - } - - if r.conf.WalkProc { - conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs) - if err != nil { - return rpt, err - } - for conn := conns.Next(); conn != nil; conn = conns.Next() { - var ( - namespaceID string - tuple = fourTuple{ - conn.LocalAddress.String(), - conn.RemoteAddress.String(), - conn.LocalPort, - conn.RemotePort, - } - toNodeInfo = map[string]string{Procspied: "true"} - fromNodeInfo = map[string]string{Procspied: "true"} - ) - if conn.Proc.PID > 0 { - fromNodeInfo[process.PID] = strconv.FormatUint(uint64(conn.Proc.PID), 10) - fromNodeInfo[report.HostNodeID] = hostNodeID - } - - if conn.Proc.NetNamespaceID > 0 { - namespaceID = strconv.FormatUint(conn.Proc.NetNamespaceID, 10) - } - - // If we've already seen this connection, we should know the direction - // (or have already figured it out), so we normalize and use the - // canonical direction. Otherwise, we can use a port-heuristic to guess - // the direction. - canonical, ok := seenTuples[tuple.key()] - if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { - tuple.reverse() - toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo - } - r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) - } - } + r.connectionTracker.ReportConnections(&rpt) r.natMapper.applyNAT(rpt, r.conf.HostID) return rpt, nil } - -func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, namespaceID string, extraFromNode, extraToNode map[string]string) { - var ( - fromNode = r.makeEndpointNode(namespaceID, t.fromAddr, t.fromPort, extraFromNode) - toNode = r.makeEndpointNode(namespaceID, t.toAddr, t.toPort, extraToNode) - ) - rpt.Endpoint = rpt.Endpoint.AddNode(fromNode.WithEdge(toNode.ID, report.EdgeMetadata{})) - rpt.Endpoint = rpt.Endpoint.AddNode(toNode) -} - -func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16, extra map[string]string) report.Node { - portStr := strconv.Itoa(int(port)) - node := report.MakeNodeWith( - report.MakeEndpointNodeID(r.conf.HostID, namespaceID, addr, portStr), - map[string]string{Addr: addr, Port: portStr}) - if names := r.conf.DNSSnooper.CachedNamesForIP(addr); len(names) > 0 { - node = node.WithSet(SnoopedDNSNames, report.MakeStringSet(names...)) - } - if names, err := r.reverseResolver.get(addr); err == nil && len(names) > 0 { - node = node.WithSet(ReverseDNSNames, report.MakeStringSet(names...)) - } - if extra != nil { - node = node.WithLatests(extra) - } - return node -} - -func newu64(i uint64) *uint64 { - return &i -} diff --git a/prog/main.go b/prog/main.go index 00fba1647c..1efe1cf93e 100644 --- a/prog/main.go +++ b/prog/main.go @@ -99,6 +99,7 @@ type probeFlags struct { spyProcs bool // Associate endpoints with processes (must be root) procEnabled bool // Produce process topology & process nodes in endpoint + useEbpfConn bool // Enable connection tracking with eBPF procRoot string dockerEnabled bool @@ -283,6 +284,7 @@ func main() { flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)") flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem") flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections") + flag.BoolVar(&flags.probe.useEbpfConn, "probe.ebpf.connections", false, "enable connection tracking with eBPF") // Docker flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes") diff --git a/prog/probe.go b/prog/probe.go index e9cb8d9772..cb74cf70c1 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -161,7 +161,10 @@ func probeMain(flags probeFlags, targets []appclient.Target) { var scanner procspy.ConnectionScanner if flags.procEnabled { processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot)) - scanner = procspy.NewConnectionScanner(processCache) + // The eBPF tracker finds connections itself and does not need the connection scanner + if !flags.useEbpfConn { + scanner = procspy.NewConnectionScanner(processCache) + } p.AddTicker(processCache) p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies, flags.noCommandLineArguments)) } @@ -179,6 +182,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { SpyProcs: flags.spyProcs, UseConntrack: flags.useConntrack, WalkProc: flags.procEnabled, + UseEbpfConn: flags.useEbpfConn, ProcRoot: flags.procRoot, BufferSize: flags.conntrackBufferSize, Scanner: scanner, diff --git a/render/filters.go b/render/filters.go index da39e0c440..28a0fb4834 100644 --- a/render/filters.go +++ b/render/filters.go @@ -82,6 +82,18 @@ func ColorConnected(r Renderer) Renderer { // FilterFunc is the function type used by Filters type FilterFunc func(report.Node) bool +// AnyFilterFunc checks if any of the filterfuncs matches. +func AnyFilterFunc(fs ...FilterFunc) FilterFunc { + return func(n report.Node) bool { + for _, f := range fs { + if f(n) { + return true + } + } + return false + } +} + // ComposeFilterFuncs composes filterfuncs into a single FilterFunc checking all. func ComposeFilterFuncs(fs ...FilterFunc) FilterFunc { return func(n report.Node) bool { @@ -224,15 +236,30 @@ func IsRunning(n report.Node) bool { // IsStopped checks if the node is *not* a running docker container var IsStopped = Complement(IsRunning) +func nonProcspiedFilter(node report.Node) bool { + _, ok := node.Latest.Lookup(endpoint.Procspied) + return ok +} + +func nonEBPFFilter(node report.Node) bool { + _, ok := node.Latest.Lookup(endpoint.EBPF) + return ok +} + // FilterNonProcspied removes endpoints which were not found in procspy. func FilterNonProcspied(r Renderer) Renderer { - return MakeFilter( - func(node report.Node) bool { - _, ok := node.Latest.Lookup(endpoint.Procspied) - return ok - }, - r, - ) + return MakeFilter(nonProcspiedFilter, r) +} + +// FilterNonEBPF removes endpoints which were not found via eBPF. +func FilterNonEBPF(r Renderer) Renderer { + return MakeFilter(nonEBPFFilter, r) +} + +// FilterNonProcspiedNorEBPF removes endpoints which were not found in procspy +// nor via eBPF. +func FilterNonProcspiedNorEBPF(r Renderer) Renderer { + return MakeFilter(AnyFilterFunc(nonProcspiedFilter, nonEBPFFilter), r) } // IsApplication checks if the node is an "application" node diff --git a/render/process.go b/render/process.go index 82d756fb8f..13e52e830a 100644 --- a/render/process.go +++ b/render/process.go @@ -23,7 +23,7 @@ func renderProcesses(rpt report.Report) bool { } // EndpointRenderer is a Renderer which produces a renderable endpoint graph. -var EndpointRenderer = FilterNonProcspied(SelectEndpoint) +var EndpointRenderer = FilterNonProcspiedNorEBPF(SelectEndpoint) // ProcessRenderer is a Renderer which produces a renderable process // graph by merging the endpoint graph and the process topology. diff --git a/scope b/scope index 57e7bb7ccd..06c7fe44b9 100755 --- a/scope +++ b/scope @@ -170,6 +170,7 @@ launch_command() { echo docker run --privileged $USERNS_HOST -d --name="$SCOPE_CONTAINER_NAME" --net=host --pid=host \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var/run/scope/plugins:/var/run/scope/plugins \ + -v /sys/kernel/debug:/sys/kernel/debug \ -e CHECKPOINT_DISABLE \ $WEAVESCOPE_DOCKER_ARGS "$SCOPE_IMAGE" --probe.docker=true }