diff --git a/.gitignore b/.gitignore index 3eabc36b6a..e63a2790db 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ prog/scope docker/scope docker/docker.tgz docker/docker +docker/ebpf.tgz docker/weave docker/weaveutil docker/runsvinit diff --git a/Makefile b/Makefile index c371c333b5..139faa3ade 100644 --- a/Makefile +++ b/Makefile @@ -24,14 +24,31 @@ RM=--rm RUN_FLAGS=-ti BUILD_IN_CONTAINER=true GO_ENV=GOGC=off -GO=env $(GO_ENV) go -NO_CROSS_COMP=unset GOOS GOARCH -GO_HOST=$(NO_CROSS_COMP); $(GO) -WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV) +GO_ENV_ARM=$(GO_ENV) CC=/usr/bin/arm-linux-gnueabihf-gcc GO_BUILD_INSTALL_DEPS=-i GO_BUILD_TAGS='netgo unsafe' GO_BUILD_FLAGS=$(GO_BUILD_INSTALL_DEPS) -ldflags "-extldflags \"-static\" -X main.version=$(SCOPE_VERSION) -s -w" -tags $(GO_BUILD_TAGS) + +ifeq ($(GOOS),linux) +GO_ENV+=CGO_ENABLED=1 +endif + +ifeq ($(GOARCH),arm) +GO=env $(GO_ENV_ARM) go +# The version of go shipped on debian doesn't have some standard library +# packages for arm and when it tries to install them it fails because it +# doesn't have permission to write to /usr/lib +# Use -pkgdir if we build for arm so packages are installed in $HOME +GO_BUILD_FLAGS+=-pkgdir ~ +else +GO=env $(GO_ENV) go +endif + +NO_CROSS_COMP=unset GOOS GOARCH +GO_HOST=$(NO_CROSS_COMP); env $(GO_ENV) go +WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV) IMAGE_TAG=$(shell ./tools/image-tag) +EBPF_IMAGE=kinvolk/tcptracer-bpf:master-769adde all: $(SCOPE_EXPORT) @@ -59,7 +76,11 @@ docker/docker: $(DOCKER_DISTRIB) $(CLOUD_AGENT_EXPORT): docker/Dockerfile.cloud-agent docker/$(SCOPE_EXE) docker/docker docker/weave docker/weaveutil -$(SCOPE_EXPORT): docker/Dockerfile.scope $(CLOUD_AGENT_EXPORT) docker/$(RUNSVINIT) docker/demo.json docker/run-app docker/run-probe docker/entrypoint.sh +docker/ebpf.tgz: Makefile + $(SUDO) docker pull $(EBPF_IMAGE) + CONTAINER_ID=$(shell $(SUDO) docker run -d $(EBPF_IMAGE) /bin/false 2>/dev/null || true); $(SUDO) docker export -o docker/ebpf.tgz $${CONTAINER_ID} + +$(SCOPE_EXPORT): docker/Dockerfile.scope $(CLOUD_AGENT_EXPORT) docker/$(RUNSVINIT) docker/demo.json docker/run-app docker/run-probe docker/entrypoint.sh docker/ebpf.tgz $(RUNSVINIT): vendor/runsvinit/*.go diff --git a/backend/Dockerfile b/backend/Dockerfile index 128366fcb6..c2e1be3263 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,6 +1,8 @@ -FROM golang:1.7.4 +FROM ubuntu:yakkety +ENV GOPATH /go +ENV PATH /go/bin:/usr/lib/go-1.7/bin:/usr/bin:/bin:/usr/sbin:/sbin RUN apt-get update && \ - apt-get install -y libpcap-dev python-requests time file shellcheck && \ + apt-get install -y libpcap-dev python-requests time file shellcheck golang-1.7 git gcc-arm-linux-gnueabihf && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN go clean -i net && \ go install -tags netgo std && \ diff --git a/circle.yml b/circle.yml index 6c4a9c5e36..1ffff3ebb8 100644 --- a/circle.yml +++ b/circle.yml @@ -48,7 +48,7 @@ test: parallel: true - cd $SRCDIR; make RM= client-lint static: parallel: true - - cd $SRCDIR; rm -f prog/scope; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm make GO_BUILD_INSTALL_DEPS= RM= prog/scope; else GOOS=darwin make GO_BUILD_INSTALL_DEPS= RM= prog/scope; fi: + - cd $SRCDIR; rm -f prog/scope; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm GOOS=linux make GO_BUILD_INSTALL_DEPS= RM= prog/scope; else GOOS=darwin GOOS=linux make GO_BUILD_INSTALL_DEPS= RM= prog/scope; fi: parallel: true - cd $SRCDIR; rm -f prog/scope; make RM=: parallel: true diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 3f4711ad73..4879dc26cd 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -65,14 +65,14 @@ type conntrack struct { // flowWalker is something that maintains flows, and provides an accessor // method to walk them. type flowWalker interface { - walkFlows(f func(flow)) + walkFlows(f func(flow, bool)) stop() } type nilFlowWalker struct{} -func (n nilFlowWalker) stop() {} -func (n nilFlowWalker) walkFlows(f func(flow)) {} +func (n nilFlowWalker) stop() {} +func (n nilFlowWalker) walkFlows(f func(flow, bool)) {} // conntrackWalker uses the conntrack command to track network connections and // implement flowWalker. @@ -463,14 +463,14 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { // walkFlows calls f with all active flows and flows that have come and gone // since the last call to walkFlows -func (c *conntrackWalker) walkFlows(f func(flow)) { +func (c *conntrackWalker) walkFlows(f func(flow, bool)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { - f(flow) + f(flow, true) } for _, flow := range c.bufferedFlows { - f(flow) + f(flow, false) } c.bufferedFlows = c.bufferedFlows[:0] } diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go new file mode 100644 index 0000000000..6c96b426d1 --- /dev/null +++ b/probe/endpoint/ebpf.go @@ -0,0 +1,256 @@ +package endpoint + +import ( + "bytes" + "encoding/binary" + "net" + "strconv" + "sync" + + log "github.com/Sirupsen/logrus" + bpflib "github.com/iovisor/gobpf/elf" + "github.com/kinvolk/tcptracer-bpf/pkg/byteorder" + "github.com/kinvolk/tcptracer-bpf/pkg/offsetguess" +) + +type eventType uint32 + +// These constants should be in sync with the equivalent definitions in the ebpf program. +const ( + _ eventType = iota + EventConnect + EventAccept + EventClose +) + +func (e eventType) String() string { + switch e { + case EventConnect: + return "connect" + case EventAccept: + return "accept" + case EventClose: + return "close" + default: + return "unknown" + } +} + +// tcpEvent should be in sync with the struct in the ebpf maps. +type tcpEvent struct { + // Timestamp must be the first field, the sorting depends on it + Timestamp uint64 + + CPU uint64 + Type uint32 + Pid uint32 + Comm [16]byte + SAddr uint32 + DAddr uint32 + SPort uint16 + DPort uint16 + NetNS uint32 +} + +// An ebpfConnection represents a TCP connection +type ebpfConnection struct { + tuple fourTuple + networkNamespace string + incoming bool + pid int +} + +type eventTracker interface { + handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) + hasDied() bool + run() + walkConnections(f func(ebpfConnection)) + initialize() + isInitialized() bool + stop() +} + +var ebpfTracker *EbpfTracker + +// nilTracker is a tracker that does nothing, and it implements the eventTracker interface. +// It is returned when the useEbpfConn flag is false. +type nilTracker struct{} + +func (n nilTracker) handleConnection(_ string, _ fourTuple, _ int, _ string) {} +func (n nilTracker) hasDied() bool { return true } +func (n nilTracker) run() {} +func (n nilTracker) walkConnections(f func(ebpfConnection)) {} +func (n nilTracker) initialize() {} +func (n nilTracker) isInitialized() bool { return false } +func (n nilTracker) stop() {} + +// EbpfTracker contains the sets of open and closed TCP connections. +// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. +type EbpfTracker struct { + sync.Mutex + reader *bpflib.Module + initialized bool + dead bool + + openConnections map[string]ebpfConnection + closedConnections []ebpfConnection +} + +func newEbpfTracker(useEbpfConn bool) eventTracker { + if !useEbpfConn { + return &nilTracker{} + } + + bpfObjectFile, err := findBpfObjectFile() + if err != nil { + log.Errorf("Cannot find BPF object file: %v", err) + return &nilTracker{} + } + + bpfPerfEvent := bpflib.NewModule(bpfObjectFile) + if bpfPerfEvent == nil { + return &nilTracker{} + } + err = bpfPerfEvent.Load() + if err != nil { + log.Errorf("Error loading BPF program: %v", err) + return &nilTracker{} + } + + bpfPerfEvent.EnableKprobes() + + tracker := &EbpfTracker{ + openConnections: map[string]ebpfConnection{}, + reader: bpfPerfEvent, + } + tracker.run() + + ebpfTracker = tracker + return tracker +} + +func (t *EbpfTracker) handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) { + t.Lock() + defer t.Unlock() + log.Debugf("handleConnection(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v)", + eventType, tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, networkNamespace) + + switch eventType { + case "connect": + conn := ebpfConnection{ + incoming: false, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "accept": + conn := ebpfConnection{ + incoming: true, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "close": + if deadConn, ok := t.openConnections[tuple.String()]; ok { + delete(t.openConnections, tuple.String()) + t.closedConnections = append(t.closedConnections, deadConn) + } else { + log.Errorf("EbpfTracker error: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) + } + } +} + +func tcpEventCallback(event tcpEvent) { + var alive bool + typ := eventType(event.Type) + pid := event.Pid & 0xffffffff + + saddrbuf := make([]byte, 4) + daddrbuf := make([]byte, 4) + + byteorder.Host.PutUint32(saddrbuf, uint32(event.SAddr)) + byteorder.Host.PutUint32(daddrbuf, uint32(event.DAddr)) + + sIP := net.IPv4(saddrbuf[0], saddrbuf[1], saddrbuf[2], saddrbuf[3]) + dIP := net.IPv4(daddrbuf[0], daddrbuf[1], daddrbuf[2], daddrbuf[3]) + + sport := event.SPort + dport := event.DPort + + if typ.String() == "close" || typ.String() == "unknown" { + alive = true + } else { + alive = false + } + tuple := fourTuple{sIP.String(), dIP.String(), uint16(sport), uint16(dport), alive} + + log.Debugf("tcpEventCallback(%v, [%v:%v --> %v:%v], pid=%v, netNS=%v, cpu=%v, ts=%v)", + typ.String(), tuple.fromAddr, tuple.fromPort, tuple.toAddr, tuple.toPort, pid, event.NetNS, event.CPU, event.Timestamp) + ebpfTracker.handleConnection(typ.String(), tuple, int(pid), strconv.FormatUint(uint64(event.NetNS), 10)) +} + +// walkConnections calls f with all open connections and connections that have come and gone +// since the last call to walkConnections +func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { + t.Lock() + defer t.Unlock() + + for _, connection := range t.openConnections { + f(connection) + } + for _, connection := range t.closedConnections { + f(connection) + } + t.closedConnections = t.closedConnections[:0] +} + +func (t *EbpfTracker) run() { + if err := offsetguess.Guess(t.reader); err != nil { + log.Errorf("%v\n", err) + return + } + + channel := make(chan []byte) + + go func() { + var event tcpEvent + for { + data := <-channel + err := binary.Read(bytes.NewBuffer(data), byteorder.Host, &event) + if err != nil { + log.Errorf("Failed to decode received data: %s\n", err) + continue + } + tcpEventCallback(event) + } + }() + + pmIPv4, err := bpflib.InitPerfMap(t.reader, "tcp_event_ipv4", channel) + if err != nil { + log.Errorf("%v\n", err) + return + } + + pmIPv4.PollStart() +} + +func (t *EbpfTracker) hasDied() bool { + t.Lock() + defer t.Unlock() + + return t.dead +} + +func (t *EbpfTracker) initialize() { + t.initialized = true +} + +func (t *EbpfTracker) isInitialized() bool { + return t.initialized +} + +func (t *EbpfTracker) stop() { + // TODO: stop the go routine in run() +} diff --git a/probe/endpoint/ebpf_linux.go b/probe/endpoint/ebpf_linux.go new file mode 100644 index 0000000000..352c03ef07 --- /dev/null +++ b/probe/endpoint/ebpf_linux.go @@ -0,0 +1,7 @@ +//+build linux + +package endpoint + +func findBpfObjectFile() (string, error) { + return "/usr/libexec/scope/ebpf/ebpf.o", nil +} diff --git a/probe/endpoint/ebpf_unsupported.go b/probe/endpoint/ebpf_unsupported.go new file mode 100644 index 0000000000..011181fa96 --- /dev/null +++ b/probe/endpoint/ebpf_unsupported.go @@ -0,0 +1,9 @@ +//+build !linux + +package endpoint + +import "fmt" + +func findBpfObjectFile() (string, error) { + return "", fmt.Errorf("not supported") +} diff --git a/probe/endpoint/four_tuple.go b/probe/endpoint/four_tuple.go new file mode 100644 index 0000000000..2624ecd4b6 --- /dev/null +++ b/probe/endpoint/four_tuple.go @@ -0,0 +1,44 @@ +package endpoint + +import ( + "fmt" + "sort" + "strings" +) + +// fourTuple is an (IP, port, IP, port) tuple, representing a connection +type fourTuple struct { + fromAddr, toAddr string + fromPort, toPort uint16 + alive bool +} + +func (t fourTuple) String() string { + return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort) +} + +// key is a sortable direction-independent key for tuples, used to look up a +// fourTuple when you are unsure of its direction. +func (t fourTuple) key() string { + key := []string{ + fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), + fmt.Sprintf("%s:%d", t.toAddr, t.toPort), + } + sort.Strings(key) + return strings.Join(key, " ") +} + +// reverse flips the direction of the tuple +func (t *fourTuple) reverse() { + t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort +} + +// reverse flips the direction of a tuple, without side effects +func reverse(tuple fourTuple) fourTuple { + return fourTuple{ + fromAddr: tuple.toAddr, + toAddr: tuple.fromAddr, + fromPort: tuple.toPort, + toPort: tuple.fromPort, + } +} diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 88c9e8d696..382fe514fa 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -49,7 +49,7 @@ func toMapping(f flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - n.flowWalker.walkFlows(func(f flow) { + n.flowWalker.walkFlows(func(f flow, alive bool) { mapping := toMapping(f) realEndpointPort := strconv.Itoa(mapping.originalPort) diff --git a/probe/endpoint/nat_internal_test.go b/probe/endpoint/nat_internal_test.go index 2214be5f44..b88333c305 100644 --- a/probe/endpoint/nat_internal_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -13,9 +13,9 @@ type mockFlowWalker struct { flows []flow } -func (m *mockFlowWalker) walkFlows(f func(flow)) { +func (m *mockFlowWalker) walkFlows(f func(f flow, alive bool)) { for _, flow := range m.flows { - f(flow) + f(flow, true) } } diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/background_reader_linux.go index d618998a60..7c729df3e1 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/background_reader_linux.go @@ -38,6 +38,28 @@ func newBackgroundReader(walker process.Walker) *backgroundReader { return br } +func newForegroundReader(walker process.Walker) *backgroundReader { + br := &backgroundReader{ + stopc: make(chan struct{}), + latestSockets: map[uint64]*Proc{}, + } + var ( + walkc = make(chan walkResult) + ticker = time.NewTicker(time.Millisecond) // fire every millisecond + pWalker = newPidWalker(walker, ticker.C, fdBlockSize) + ) + + go performWalk(pWalker, walkc) + + result := <-walkc + br.mtx.Lock() + br.latestBuf = result.buf + br.latestSockets = result.sockets + br.mtx.Unlock() + + return br +} + func (br *backgroundReader) stop() { close(br.stopc) } diff --git a/probe/endpoint/procspy/spy_darwin.go b/probe/endpoint/procspy/spy_darwin.go index bec6f2a132..394407f39f 100644 --- a/probe/endpoint/procspy/spy_darwin.go +++ b/probe/endpoint/procspy/spy_darwin.go @@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner { return &darwinScanner{} } +// NewSyncConnectionScanner creates a new synchronous Darwin ConnectionScanner +func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner { + return &darwinScanner{} +} + type darwinScanner struct{} // Connections returns all established (TCP) connections. No need to be root diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 6966f852f9..cac7661cd5 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -38,6 +38,12 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner { return &linuxScanner{br} } +// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner +func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner { + br := newForegroundReader(walker) + return &linuxScanner{br} +} + type linuxScanner struct { br *backgroundReader } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index f58fce91a1..301dbf2374 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,14 +1,12 @@ package endpoint import ( - "fmt" - "sort" "strconv" - "strings" "time" "github.com/prometheus/client_golang/prometheus" + log "github.com/Sirupsen/logrus" "github.com/weaveworks/scope/probe/endpoint/procspy" "github.com/weaveworks/scope/probe/process" "github.com/weaveworks/scope/report" @@ -19,6 +17,7 @@ const ( Addr = "addr" // typically IPv4 Port = "port" Conntracked = "conntracked" + EBPF = "eBPF" Procspied = "procspied" ReverseDNSNames = "reverse_dns_names" SnoopedDNSNames = "snooped_dns_names" @@ -31,6 +30,7 @@ type ReporterConfig struct { SpyProcs bool UseConntrack bool WalkProc bool + UseEbpfConn bool ProcRoot string BufferSize int Scanner procspy.ConnectionScanner @@ -41,6 +41,7 @@ type ReporterConfig struct { type Reporter struct { conf ReporterConfig flowWalker flowWalker // interface + ebpfTracker eventTracker natMapper natMapper reverseResolver *reverseResolver } @@ -66,6 +67,7 @@ func NewReporter(conf ReporterConfig) *Reporter { return &Reporter{ conf: conf, flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize), + ebpfTracker: newEbpfTracker(conf.UseEbpfConn), natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")), reverseResolver: newReverseResolver(), } @@ -80,27 +82,7 @@ func (r *Reporter) Stop() { r.natMapper.stop() r.reverseResolver.stop() r.conf.Scanner.Stop() -} - -type fourTuple struct { - fromAddr, toAddr string - fromPort, toPort uint16 -} - -// key is a sortable direction-independent key for tuples, used to look up a -// fourTuple, when you are unsure of it's direction. -func (t fourTuple) key() string { - key := []string{ - fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), - fmt.Sprintf("%s:%d", t.toAddr, t.toPort), - } - sort.Strings(key) - return strings.Join(key, " ") -} - -// reverse flips the direction of the tuple -func (t *fourTuple) reverse() { - t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort + r.ebpfTracker.stop() } // Report implements Reporter. @@ -111,19 +93,22 @@ func (r *Reporter) Report() (report.Report, error) { hostNodeID := report.MakeHostNodeID(r.conf.HostID) rpt := report.MakeReport() + seenTuples := map[string]fourTuple{} // Consult the flowWalker for short-lived connections - { + // With eBPF, this is used only in the first round to build seenTuples for WalkProc + if r.conf.WalkProc || !r.conf.UseEbpfConn { extraNodeInfo := map[string]string{ Conntracked: "true", } - r.flowWalker.walkFlows(func(f flow) { + r.flowWalker.walkFlows(func(f flow, alive bool) { tuple := fourTuple{ f.Original.Layer3.SrcIP, f.Original.Layer3.DstIP, uint16(f.Original.Layer4.SrcPort), uint16(f.Original.Layer4.DstPort), + alive, } // Handle DNAT-ed short-lived connections. // The NAT mapper won't help since it only runs periodically, @@ -134,6 +119,7 @@ func (r *Reporter) Report() (report.Report, error) { f.Reply.Layer3.SrcIP, uint16(f.Reply.Layer4.DstPort), uint16(f.Reply.Layer4.SrcPort), + alive, } } @@ -144,6 +130,7 @@ func (r *Reporter) Report() (report.Report, error) { if r.conf.WalkProc { conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs) + defer r.procParsingSwitcher() if err != nil { return rpt, err } @@ -155,6 +142,7 @@ func (r *Reporter) Report() (report.Report, error) { conn.RemoteAddress.String(), conn.LocalPort, conn.RemotePort, + true, } toNodeInfo = map[string]string{Procspied: "true"} fromNodeInfo = map[string]string{Procspied: "true"} @@ -174,13 +162,46 @@ func (r *Reporter) Report() (report.Report, error) { // the direction. canonical, ok := seenTuples[tuple.key()] if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { - tuple.reverse() - toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo + if tuple.alive { + r.feedToEbpf(tuple, true, int(conn.Proc.PID), namespaceID) + } + r.addConnection(&rpt, reverse(tuple), namespaceID, toNodeInfo, fromNodeInfo) + } else { + if tuple.alive { + r.feedToEbpf(tuple, false, int(conn.Proc.PID), namespaceID) + } + r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) } - r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) + } } + // eBPF + if r.conf.UseEbpfConn && !r.ebpfTracker.hasDied() { + r.ebpfTracker.walkConnections(func(e ebpfConnection) { + fromNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + toNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + if e.pid > 0 { + fromNodeInfo[process.PID] = strconv.Itoa(e.pid) + fromNodeInfo[report.HostNodeID] = hostNodeID + } + log.Debugf("Report: ebpfTracker %v (%v) (%v)", e.tuple, e.pid, e.incoming) + + if e.incoming { + r.addConnection(&rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo) + } else { + r.addConnection(&rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo) + } + + }) + } + r.natMapper.applyNAT(rpt, r.conf.HostID) return rpt, nil } @@ -214,3 +235,29 @@ func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16 func newu64(i uint64) *uint64 { return &i } + +// procParsingSwitcher make sure that if eBPF tracking is enabled, +// connections coming from /proc parsing are only walked once. +func (r *Reporter) procParsingSwitcher() { + if r.conf.WalkProc && r.conf.UseEbpfConn { + r.conf.WalkProc = false + r.ebpfTracker.initialize() + + r.flowWalker.stop() + } +} + +// if the eBPF tracker is enabled, feed the existing connections into it +// incoming connections correspond to "accept" events +// outgoing connections correspond to "connect" events +func (r Reporter) feedToEbpf(tuple fourTuple, incoming bool, pid int, namespaceID string) { + if r.conf.UseEbpfConn && !r.ebpfTracker.isInitialized() { + tcpEventType := "connect" + + if incoming { + tcpEventType = "accept" + } + + r.ebpfTracker.handleConnection(tcpEventType, tuple, pid, namespaceID) + } +} diff --git a/prog/main.go b/prog/main.go index 1bbafee25b..64acd1ffe8 100644 --- a/prog/main.go +++ b/prog/main.go @@ -97,6 +97,7 @@ type probeFlags struct { spyProcs bool // Associate endpoints with processes (must be root) procEnabled bool // Produce process topology & process nodes in endpoint + useEbpfConn bool // Enable connection tracking with eBPF procRoot string dockerEnabled bool @@ -278,6 +279,7 @@ func main() { flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)") flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem") flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections") + flag.BoolVar(&flags.probe.useEbpfConn, "probe.ebpf.connections", false, "enable connection tracking with eBPF") // Docker flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes") diff --git a/prog/probe.go b/prog/probe.go index 50fee324d4..b8bb7f651e 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -161,8 +161,14 @@ func probeMain(flags probeFlags, targets []appclient.Target) { var scanner procspy.ConnectionScanner if flags.procEnabled { processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot)) - scanner = procspy.NewConnectionScanner(processCache) + processCache.Tick() p.AddTicker(processCache) + // if eBPF tracking is enabled, scan /proc synchronously, and just once + if flags.useEbpfConn { + scanner = procspy.NewSyncConnectionScanner(processCache) + } else { + scanner = procspy.NewConnectionScanner(processCache) + } p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies)) } @@ -179,6 +185,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { SpyProcs: flags.spyProcs, UseConntrack: flags.useConntrack, WalkProc: flags.procEnabled, + UseEbpfConn: flags.useEbpfConn, ProcRoot: flags.procRoot, BufferSize: flags.conntrackBufferSize, Scanner: scanner, diff --git a/scope b/scope index 57e7bb7ccd..06c7fe44b9 100755 --- a/scope +++ b/scope @@ -170,6 +170,7 @@ launch_command() { echo docker run --privileged $USERNS_HOST -d --name="$SCOPE_CONTAINER_NAME" --net=host --pid=host \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var/run/scope/plugins:/var/run/scope/plugins \ + -v /sys/kernel/debug:/sys/kernel/debug \ -e CHECKPOINT_DISABLE \ $WEAVESCOPE_DOCKER_ARGS "$SCOPE_IMAGE" --probe.docker=true }