diff --git a/Makefile b/Makefile index 5628c3d080..cf3c4f0af7 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ GO_HOST=$(NO_CROSS_COMP); $(GO) WITH_GO_HOST_ENV=$(NO_CROSS_COMP); $(GO_ENV) GO_BUILD_INSTALL_DEPS=-i GO_BUILD_TAGS='netgo unsafe' -GO_BUILD_FLAGS=$(GO_BUILD_INSTALL_DEPS) -ldflags "-extldflags \"-static\" -X main.version=$(SCOPE_VERSION) -s -w" -tags $(GO_BUILD_TAGS) +GO_BUILD_FLAGS=$(GO_BUILD_INSTALL_DEPS) -ldflags "-X main.version=$(SCOPE_VERSION) -s -w" -tags $(GO_BUILD_TAGS) IMAGE_TAG=$(shell ./tools/image-tag) all: $(SCOPE_EXPORT) diff --git a/backend/Dockerfile b/backend/Dockerfile index 3c1c137951..28fadfc6c6 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,6 +1,9 @@ -FROM golang:1.7.1 +FROM ubuntu:yakkety +ENV GOPATH /go +ENV PATH /go/bin:/usr/lib/go-1.7/bin:/usr/bin:/bin:/usr/sbin:/sbin +RUN echo "deb [trusted=yes] http://52.8.15.63/apt/xenial xenial-nightly main" > /etc/apt/sources.list.d/iovisor.list RUN apt-get update && \ - apt-get install -y libpcap-dev python-requests time file shellcheck && \ + apt-get install -y libpcap-dev python-requests time file shellcheck libelf1 bcc-tools libbcc golang-1.7 git && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN go clean -i net && \ go install -tags netgo std && \ @@ -12,5 +15,6 @@ RUN go get -tags netgo \ github.com/mjibson/esc \ github.com/client9/misspell/cmd/misspell && \ rm -rf /go/pkg/ /go/src/ + COPY build.sh / ENTRYPOINT ["/build.sh"] diff --git a/docker/Dockerfile b/docker/Dockerfile index 029177008b..3972b0284b 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,10 +1,8 @@ -FROM alpine:3.3 +FROM zlim/bcc MAINTAINER Weaveworks Inc LABEL works.weave.role=system WORKDIR /home/weave -RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/community" >>/etc/apk/repositories && \ - apk add --update bash runit conntrack-tools iproute2 util-linux curl && \ - rm -rf /var/cache/apk/* +RUN apt-get update -y && apt-get install -y runit bash conntrack iproute2 util-linux libbcc libpcap0.8 localepurge && printf "#NEEDSCONFIGFIRST\nMANDELETE\nSHOWFREEDSPACE\nC\n" > /etc/locale.nopurge && dpkg-reconfigure localepurge && apt-get autoremove -y python curl localepurge && rm -rf /var/lib/dpkg/info/* && rm -rf /var/lib/apt/lists/* ADD ./docker.tgz / ADD ./demo.json / ADD ./weave /usr/bin/ diff --git a/probe/endpoint/ebpf.go b/probe/endpoint/ebpf.go new file mode 100644 index 0000000000..43f907bb23 --- /dev/null +++ b/probe/endpoint/ebpf.go @@ -0,0 +1,533 @@ +package endpoint + +import ( + "encoding/binary" + "fmt" + "net" + "strconv" + "sync" + "unsafe" + + log "github.com/Sirupsen/logrus" + "github.com/iovisor/gobpf" +) + +/* +#cgo CFLAGS: -I/usr/include/bcc/compat +#cgo LDFLAGS: -lbcc +#include +#include +#include +#include + +void *bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid, int cpu); + +extern void tcpEventCb(); + +#define TASK_COMM_LEN 16 // linux/sched.h + +struct tcp_event_t { + char ev_type[12]; + uint32_t pid; + char comm[TASK_COMM_LEN]; + uint32_t saddr; + uint32_t daddr; + uint16_t sport; + uint16_t dport; + uint32_t netns; +}; + +*/ +import "C" + +const source string = ` +#include +#include +#include +#include +#include + +#define TCP_EVENT_TYPE_CONNECT 1 +#define TCP_EVENT_TYPE_ACCEPT 2 +#define TCP_EVENT_TYPE_CLOSE 3 + +struct tcp_event_t { + char ev_type[12]; + u32 pid; + char comm[TASK_COMM_LEN]; + u32 saddr; + u32 daddr; + u16 sport; + u16 dport; + u32 netns; +}; + +BPF_PERF_OUTPUT(tcp_event); +BPF_HASH(connectsock, u64, struct sock *); +BPF_HASH(closesock, u64, struct sock *); + +int kprobe__tcp_v4_connect(struct pt_regs *ctx, struct sock *sk) +{ + u64 pid = bpf_get_current_pid_tgid(); + + // stash the sock ptr for lookup on return + connectsock.update(&pid, &sk); + + return 0; +}; + +int kretprobe__tcp_v4_connect(struct pt_regs *ctx) +{ + int ret = PT_REGS_RC(ctx); + u64 pid = bpf_get_current_pid_tgid(); + + struct sock **skpp; + skpp = connectsock.lookup(&pid); + if (skpp == 0) { + return 0; // missed entry + } + + if (ret != 0) { + // failed to send SYNC packet, may not have populated + // socket __sk_common.{skc_rcv_saddr, ...} + connectsock.delete(&pid); + return 0; + } + + + // pull in details + struct sock *skp = *skpp; + struct ns_common *ns; + u32 saddr = 0, daddr = 0, net_ns_inum = 0; + u16 sport = 0, dport = 0; + bpf_probe_read(&sport, sizeof(sport), &((struct inet_sock *)skp)->inet_sport); + bpf_probe_read(&saddr, sizeof(saddr), &skp->__sk_common.skc_rcv_saddr); + bpf_probe_read(&daddr, sizeof(daddr), &skp->__sk_common.skc_daddr); + bpf_probe_read(&dport, sizeof(dport), &skp->__sk_common.skc_dport); + +// Get network namespace id, if kernel supports it +#ifdef CONFIG_NET_NS + possible_net_t skc_net; + bpf_probe_read(&skc_net, sizeof(skc_net), &skp->__sk_common.skc_net); + bpf_probe_read(&net_ns_inum, sizeof(net_ns_inum), &skc_net.net->ns.inum); +#else + net_ns_inum = 0; +#endif + + // output + struct tcp_event_t evt = { + .ev_type = "connect", + .pid = pid >> 32, + .saddr = saddr, + .daddr = daddr, + .sport = ntohs(sport), + .dport = ntohs(dport), + .netns = net_ns_inum, + }; + + bpf_get_current_comm(&evt.comm, sizeof(evt.comm)); + + // do not send event if IP address is 0.0.0.0 or port is 0 + if (evt.saddr != 0 && evt.daddr != 0 && evt.sport != 0 && evt.dport != 0) { + tcp_event.perf_submit(ctx, &evt, sizeof(evt)); + } + + connectsock.delete(&pid); + + return 0; +} + +int kprobe__tcp_close(struct pt_regs *ctx, struct sock *sk) +{ + u64 pid = bpf_get_current_pid_tgid(); + + // stash the sock ptr for lookup on return + closesock.update(&pid, &sk); + + return 0; +}; + +int kretprobe__tcp_close(struct pt_regs *ctx) +{ + u64 pid = bpf_get_current_pid_tgid(); + + struct sock **skpp; + skpp = closesock.lookup(&pid); + if (skpp == 0) { + return 0; // missed entry + } + + closesock.delete(&pid); + + // pull in details + struct sock *skp = *skpp; + u16 family = 0; + bpf_probe_read(&family, sizeof(family), &skp->__sk_common.skc_family); + if (family != AF_INET) { + return 0; + } + + u32 saddr = 0, daddr = 0, net_ns_inum = 0; + u16 sport = 0, dport = 0; + bpf_probe_read(&saddr, sizeof(saddr), &skp->__sk_common.skc_rcv_saddr); + bpf_probe_read(&daddr, sizeof(daddr), &skp->__sk_common.skc_daddr); + bpf_probe_read(&sport, sizeof(sport), &((struct inet_sock *)skp)->inet_sport); + bpf_probe_read(&dport, sizeof(dport), &skp->__sk_common.skc_dport); + +// Get network namespace id, if kernel supports it +#ifdef CONFIG_NET_NS + possible_net_t skc_net; + bpf_probe_read(&skc_net, sizeof(skc_net), &skp->__sk_common.skc_net); + bpf_probe_read(&net_ns_inum, sizeof(net_ns_inum), &skc_net.net->ns.inum); +#else + net_ns_inum = 0; +#endif + + // output + struct tcp_event_t evt = { + .ev_type = "close", + .pid = pid >> 32, + .saddr = saddr, + .daddr = daddr, + .sport = ntohs(sport), + .dport = ntohs(dport), + .netns = net_ns_inum, + }; + + bpf_get_current_comm(&evt.comm, sizeof(evt.comm)); + + // do not send event if IP address is 0.0.0.0 or port is 0 + if (evt.saddr != 0 && evt.daddr != 0 && evt.sport != 0 && evt.dport != 0) { + tcp_event.perf_submit(ctx, &evt, sizeof(evt)); + } + + return 0; +} + +int kretprobe__inet_csk_accept(struct pt_regs *ctx) +{ + struct sock *newsk = (struct sock *)PT_REGS_RC(ctx); + u64 pid = bpf_get_current_pid_tgid(); + + if (newsk == NULL) + return 0; + + // check this is TCP + u8 protocol = 0; + // workaround for reading the sk_protocol bitfield: + bpf_probe_read(&protocol, 1, (void *)((long)&newsk->sk_wmem_queued) - 3); + if (protocol != IPPROTO_TCP) + return 0; + + // pull in details + u16 family = 0, lport = 0, dport = 0; + u32 net_ns_inum = 0; + bpf_probe_read(&family, sizeof(family), &newsk->__sk_common.skc_family); + bpf_probe_read(&lport, sizeof(lport), &newsk->__sk_common.skc_num); + bpf_probe_read(&dport, sizeof(dport), &newsk->__sk_common.skc_dport); + +// Get network namespace id, if kernel supports it +#ifdef CONFIG_NET_NS + possible_net_t skc_net; + bpf_probe_read(&skc_net, sizeof(skc_net), &newsk->__sk_common.skc_net); + bpf_probe_read(&net_ns_inum, sizeof(net_ns_inum), &skc_net.net->ns.inum); +#else + net_ns_inum = 0; +#endif + + if (family == AF_INET) { + struct tcp_event_t evt = {.ev_type = "accept", .netns = net_ns_inum}; + evt.pid = pid >> 32; + bpf_probe_read(&evt.saddr, sizeof(u32), + &newsk->__sk_common.skc_rcv_saddr); + bpf_probe_read(&evt.daddr, sizeof(u32), + &newsk->__sk_common.skc_daddr); + evt.sport = lport; + evt.dport = ntohs(dport); + bpf_get_current_comm(&evt.comm, sizeof(evt.comm)); + tcp_event.perf_submit(ctx, &evt, sizeof(evt)); + } + // else drop + + return 0; +} +` + +var byteOrder binary.ByteOrder + +func init() { + var i int32 = 0x01020304 + u := unsafe.Pointer(&i) + pb := (*byte)(u) + b := *pb + if b == 0x04 { + byteOrder = binary.LittleEndian + } else { + byteOrder = binary.BigEndian + } +} + +// An ebpfConnection represents a TCP connection +type ebpfConnection struct { + tuple fourTuple + networkNamespace string + incoming bool + pid int +} + +type eventTracker interface { + handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) + hasDied() bool + run() + walkConnections(f func(ebpfConnection)) + initialize() + isInitialized() bool +} + +var ebpfTracker *EbpfTracker + +// nilTracker is a tracker that does nothing, and it implements the eventTracker interface. +// It is returned when the useEbpfConn flag is false. +type nilTracker struct{} + +func (n nilTracker) handleConnection(_ string, _ fourTuple, _ int, _ string) {} +func (n nilTracker) hasDied() bool { return true } +func (n nilTracker) run() {} +func (n nilTracker) walkConnections(f func(ebpfConnection)) {} +func (n nilTracker) initialize() {} +func (n nilTracker) isInitialized() bool { return false } + +// EbpfTracker contains the sets of open and closed TCP connections. +// Closed connections are kept in the `closedConnections` slice for one iteration of `walkConnections`. +type EbpfTracker struct { + sync.Mutex + readers []*C.struct_perf_reader + initialized bool + dead bool + + openConnections map[string]ebpfConnection + closedConnections []ebpfConnection +} + +func newEbpfTracker(useEbpfConn bool) eventTracker { + if !useEbpfConn { + return &nilTracker{} + } + m := bpf.NewBpfModule(source, []string{}) + + connect_kprobe, err := m.LoadKprobe("kprobe__tcp_v4_connect") + if err != nil { + return &nilTracker{} + } + + err = m.AttachKprobe("tcp_v4_connect", connect_kprobe) + if err != nil { + return &nilTracker{} + } + + connect_kretprobe, err := m.LoadKprobe("kretprobe__tcp_v4_connect") + if err != nil { + return &nilTracker{} + } + + err = m.AttachKretprobe("tcp_v4_connect", connect_kretprobe) + if err != nil { + return &nilTracker{} + } + + close_kprobe, err := m.LoadKprobe("kprobe__tcp_close") + if err != nil { + return &nilTracker{} + } + + err = m.AttachKprobe("tcp_close", close_kprobe) + if err != nil { + return &nilTracker{} + } + + close_kretprobe, err := m.LoadKprobe("kretprobe__tcp_close") + if err != nil { + return &nilTracker{} + } + + err = m.AttachKretprobe("tcp_close", close_kretprobe) + if err != nil { + return &nilTracker{} + } + + accept_kretprobe, err := m.LoadKprobe("kretprobe__inet_csk_accept") + if err != nil { + return &nilTracker{} + } + + err = m.AttachKretprobe("inet_csk_accept", accept_kretprobe) + if err != nil { + return &nilTracker{} + } + + t := bpf.NewBpfTable(0, m) + readers, err := initPerfMap(t) + if err != nil { + return &nilTracker{} + } + tracker := &EbpfTracker{ + openConnections: map[string]ebpfConnection{}, + readers: readers, + } + go tracker.run() + + ebpfTracker = tracker + return tracker +} + +func initPerfMap(table *bpf.BpfTable) ([]*C.struct_perf_reader, error) { + fd := table.Config()["fd"].(int) + key_size := table.Config()["key_size"].(uint64) + leaf_size := table.Config()["leaf_size"].(uint64) + + if key_size != 4 || leaf_size != 4 { + return nil, fmt.Errorf("wrong size") + } + + key := make([]byte, key_size) + leaf := make([]byte, leaf_size) + keyP := unsafe.Pointer(&key[0]) + leafP := unsafe.Pointer(&leaf[0]) + + readers := []*C.struct_perf_reader{} + + cpu := 0 + res := 0 + for res == 0 { + reader := C.bpf_open_perf_buffer((*[0]byte)(C.tcpEventCb), nil, -1, C.int(cpu)) + if reader == nil { + return nil, fmt.Errorf("failed to get reader") + } + + perfFd := C.perf_reader_fd(reader) + + readers = append(readers, (*C.struct_perf_reader)(reader)) + + // copy perfFd into leaf, respecting the host endienness + byteOrder.PutUint32(leaf, uint32(perfFd)) + + r, err := C.bpf_update_elem(C.int(fd), keyP, leafP, 0) + if r != 0 { + return nil, fmt.Errorf("unable to initialize perf map: %v", err) + } + + res = int(C.bpf_get_next_key(C.int(fd), keyP, keyP)) + cpu++ + } + return readers, nil +} + +func (t *EbpfTracker) handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) { + t.Lock() + defer t.Unlock() + + switch eventType { + case "connect": + conn := ebpfConnection{ + incoming: false, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "accept": + conn := ebpfConnection{ + incoming: true, + tuple: tuple, + pid: pid, + networkNamespace: networkNamespace, + } + t.openConnections[tuple.String()] = conn + case "close": + if deadConn, ok := t.openConnections[tuple.String()]; ok { + delete(t.openConnections, tuple.String()) + t.closedConnections = append(t.closedConnections, deadConn) + } else { + log.Errorf("EbpfTracker error: unmatched close event: %s pid=%d netns=%s", tuple.String(), pid, networkNamespace) + } + } +} + +func handleConnection(eventType string, tuple fourTuple, pid int, networkNamespace string) { + ebpfTracker.handleConnection(eventType, tuple, pid, networkNamespace) +} + +func (t *EbpfTracker) run() { + for { + C.perf_reader_poll(C.int(len(t.readers)), &t.readers[0], -1) + } +} + +func tcpEventCallback(cpu int, tcpEvent *C.struct_tcp_event_t) { + typ := C.GoString(&tcpEvent.ev_type[0]) + pid := tcpEvent.pid & 0xffffffff + + saddrbuf := make([]byte, 4) + daddrbuf := make([]byte, 4) + + binary.LittleEndian.PutUint32(saddrbuf, uint32(tcpEvent.saddr)) + binary.LittleEndian.PutUint32(daddrbuf, uint32(tcpEvent.daddr)) + + sIP := net.IPv4(saddrbuf[0], saddrbuf[1], saddrbuf[2], saddrbuf[3]) + dIP := net.IPv4(daddrbuf[0], daddrbuf[1], daddrbuf[2], daddrbuf[3]) + + sport := tcpEvent.sport + dport := tcpEvent.dport + netns := tcpEvent.netns + + tuple := fourTuple{sIP.String(), dIP.String(), uint16(sport), uint16(dport)} + handleConnection(typ, tuple, int(pid), strconv.Itoa(int(netns))) +} + +//export tcpEventCb +func tcpEventCb(cb_cookie unsafe.Pointer, raw unsafe.Pointer, raw_size C.int) { + // See src/cc/perf_reader.c:parse_sw() + // struct { + // uint32_t size; + // char data[0]; + // }; + + var tcpEvent C.struct_tcp_event_t + + if int(raw_size) != 4+int(unsafe.Sizeof(tcpEvent)) { + fmt.Printf("invalid perf event: raw_size=%d != %d + %d\n", raw_size, 4, unsafe.Sizeof(tcpEvent)) + return + } + + tcpEventCallback(0, (*C.struct_tcp_event_t)(raw)) +} + +// walkConnections calls f with all open connections and connections that have come and gone +// since the last call to walkConnections +func (t *EbpfTracker) walkConnections(f func(ebpfConnection)) { + t.Lock() + defer t.Unlock() + + for _, connection := range t.openConnections { + f(connection) + } + for _, connection := range t.closedConnections { + f(connection) + } + t.closedConnections = t.closedConnections[:0] +} + +func (t *EbpfTracker) hasDied() bool { + t.Lock() + defer t.Unlock() + + return t.dead +} + +func (t *EbpfTracker) initialize() { + t.initialized = true +} + +func (t *EbpfTracker) isInitialized() bool { + return t.initialized +} diff --git a/probe/endpoint/four_tuple.go b/probe/endpoint/four_tuple.go new file mode 100644 index 0000000000..ff2f80e0c2 --- /dev/null +++ b/probe/endpoint/four_tuple.go @@ -0,0 +1,43 @@ +package endpoint + +import ( + "fmt" + "sort" + "strings" +) + +// fourTuple is an (IP, port, IP, port) tuple, representing a connection +type fourTuple struct { + fromAddr, toAddr string + fromPort, toPort uint16 +} + +func (t fourTuple) String() string { + return fmt.Sprintf("%s:%d-%s:%d", t.fromAddr, t.fromPort, t.toAddr, t.toPort) +} + +// key is a sortable direction-independent key for tuples, used to look up a +// fourTuple when you are unsure of its direction. +func (t fourTuple) key() string { + key := []string{ + fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), + fmt.Sprintf("%s:%d", t.toAddr, t.toPort), + } + sort.Strings(key) + return strings.Join(key, " ") +} + +// reverse flips the direction of the tuple +func (t *fourTuple) reverse() { + t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort +} + +// reverse flips the direction of a tuple, without side effects +func reverse(tuple fourTuple) fourTuple { + return fourTuple{ + fromAddr: tuple.toAddr, + toAddr: tuple.fromAddr, + fromPort: tuple.toPort, + toPort: tuple.fromPort, + } +} diff --git a/probe/endpoint/procspy/background_reader_linux.go b/probe/endpoint/procspy/background_reader_linux.go index d618998a60..7c729df3e1 100644 --- a/probe/endpoint/procspy/background_reader_linux.go +++ b/probe/endpoint/procspy/background_reader_linux.go @@ -38,6 +38,28 @@ func newBackgroundReader(walker process.Walker) *backgroundReader { return br } +func newForegroundReader(walker process.Walker) *backgroundReader { + br := &backgroundReader{ + stopc: make(chan struct{}), + latestSockets: map[uint64]*Proc{}, + } + var ( + walkc = make(chan walkResult) + ticker = time.NewTicker(time.Millisecond) // fire every millisecond + pWalker = newPidWalker(walker, ticker.C, fdBlockSize) + ) + + go performWalk(pWalker, walkc) + + result := <-walkc + br.mtx.Lock() + br.latestBuf = result.buf + br.latestSockets = result.sockets + br.mtx.Unlock() + + return br +} + func (br *backgroundReader) stop() { close(br.stopc) } diff --git a/probe/endpoint/procspy/spy_darwin.go b/probe/endpoint/procspy/spy_darwin.go index bec6f2a132..394407f39f 100644 --- a/probe/endpoint/procspy/spy_darwin.go +++ b/probe/endpoint/procspy/spy_darwin.go @@ -18,6 +18,11 @@ func NewConnectionScanner(_ process.Walker) ConnectionScanner { return &darwinScanner{} } +// NewSyncConnectionScanner creates a new synchronous Darwin ConnectionScanner +func NewSyncConnectionScanner(_ process.Walker) ConnectionScanner { + return &darwinScanner{} +} + type darwinScanner struct{} // Connections returns all established (TCP) connections. No need to be root diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 61f189881c..f8a3cc72e2 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -38,6 +38,12 @@ func NewConnectionScanner(walker process.Walker) ConnectionScanner { return &linuxScanner{br} } +// NewSyncConnectionScanner creates a new synchronous Linux ConnectionScanner +func NewSyncConnectionScanner(walker process.Walker) ConnectionScanner { + br := newForegroundReader(walker) + return &linuxScanner{br} +} + type linuxScanner struct { br *backgroundReader } diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index f58fce91a1..8f6bf62e41 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,10 +1,7 @@ package endpoint import ( - "fmt" - "sort" "strconv" - "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -19,6 +16,7 @@ const ( Addr = "addr" // typically IPv4 Port = "port" Conntracked = "conntracked" + EBPF = "eBPF" Procspied = "procspied" ReverseDNSNames = "reverse_dns_names" SnoopedDNSNames = "snooped_dns_names" @@ -31,6 +29,7 @@ type ReporterConfig struct { SpyProcs bool UseConntrack bool WalkProc bool + UseEbpfConn bool ProcRoot string BufferSize int Scanner procspy.ConnectionScanner @@ -41,6 +40,7 @@ type ReporterConfig struct { type Reporter struct { conf ReporterConfig flowWalker flowWalker // interface + ebpfTracker eventTracker natMapper natMapper reverseResolver *reverseResolver } @@ -66,6 +66,7 @@ func NewReporter(conf ReporterConfig) *Reporter { return &Reporter{ conf: conf, flowWalker: newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize), + ebpfTracker: newEbpfTracker(conf.UseEbpfConn), natMapper: makeNATMapper(newConntrackFlowWalker(conf.UseConntrack, conf.ProcRoot, conf.BufferSize, "--any-nat")), reverseResolver: newReverseResolver(), } @@ -80,27 +81,7 @@ func (r *Reporter) Stop() { r.natMapper.stop() r.reverseResolver.stop() r.conf.Scanner.Stop() -} - -type fourTuple struct { - fromAddr, toAddr string - fromPort, toPort uint16 -} - -// key is a sortable direction-independent key for tuples, used to look up a -// fourTuple, when you are unsure of it's direction. -func (t fourTuple) key() string { - key := []string{ - fmt.Sprintf("%s:%d", t.fromAddr, t.fromPort), - fmt.Sprintf("%s:%d", t.toAddr, t.toPort), - } - sort.Strings(key) - return strings.Join(key, " ") -} - -// reverse flips the direction of the tuple -func (t *fourTuple) reverse() { - t.fromAddr, t.fromPort, t.toAddr, t.toPort = t.toAddr, t.toPort, t.fromAddr, t.fromPort + // TODO add a Stop method for ebpfTracker } // Report implements Reporter. @@ -111,6 +92,7 @@ func (r *Reporter) Report() (report.Report, error) { hostNodeID := report.MakeHostNodeID(r.conf.HostID) rpt := report.MakeReport() + seenTuples := map[string]fourTuple{} // Consult the flowWalker for short-lived connections @@ -144,6 +126,7 @@ func (r *Reporter) Report() (report.Report, error) { if r.conf.WalkProc { conns, err := r.conf.Scanner.Connections(r.conf.SpyProcs) + defer r.procParsingSwitcher() if err != nil { return rpt, err } @@ -174,13 +157,41 @@ func (r *Reporter) Report() (report.Report, error) { // the direction. canonical, ok := seenTuples[tuple.key()] if (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort) { - tuple.reverse() - toNodeInfo, fromNodeInfo = fromNodeInfo, toNodeInfo + r.feedToEbpf(tuple, true, int(conn.Proc.PID), namespaceID) + r.addConnection(&rpt, reverse(tuple), namespaceID, toNodeInfo, fromNodeInfo) + } else { + r.feedToEbpf(tuple, false, int(conn.Proc.PID), namespaceID) + r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) } - r.addConnection(&rpt, tuple, namespaceID, fromNodeInfo, toNodeInfo) + } } + // eBPF + if r.conf.UseEbpfConn && !r.ebpfTracker.hasDied() { + r.ebpfTracker.walkConnections(func(e ebpfConnection) { + fromNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + toNodeInfo := map[string]string{ + Procspied: "true", + EBPF: "true", + } + if e.pid > 0 { + fromNodeInfo[process.PID] = strconv.Itoa(e.pid) + fromNodeInfo[report.HostNodeID] = hostNodeID + } + + if e.incoming { + r.addConnection(&rpt, reverse(e.tuple), e.networkNamespace, toNodeInfo, fromNodeInfo) + } else { + r.addConnection(&rpt, e.tuple, e.networkNamespace, fromNodeInfo, toNodeInfo) + } + + }) + } + r.natMapper.applyNAT(rpt, r.conf.HostID) return rpt, nil } @@ -214,3 +225,27 @@ func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16 func newu64(i uint64) *uint64 { return &i } + +// procParsingSwitcher make sure that if eBPF tracking is enabled, +// connections coming from /proc parsing are only walked once. +func (r *Reporter) procParsingSwitcher() { + if r.conf.WalkProc && r.conf.UseEbpfConn { + r.conf.WalkProc = false + r.ebpfTracker.initialize() + } +} + +// if the eBPF tracker is enabled, feed the existing connections into it +// incoming connections correspond to "accept" events +// outgoing connections correspond to "connect" events +func (r Reporter) feedToEbpf(tuple fourTuple, incoming bool, pid int, namespaceID string) { + if r.conf.UseEbpfConn && !r.ebpfTracker.isInitialized() { + tcpEventType := "connect" + + if incoming { + tcpEventType = "accept" + } + + r.ebpfTracker.handleConnection(tcpEventType, tuple, pid, namespaceID) + } +} diff --git a/prog/main.go b/prog/main.go index 956549cea3..7aa935e06d 100644 --- a/prog/main.go +++ b/prog/main.go @@ -95,6 +95,7 @@ type probeFlags struct { spyProcs bool // Associate endpoints with processes (must be root) procEnabled bool // Produce process topology & process nodes in endpoint + useEbpfConn bool // Enable connection tracking with eBPF procRoot string dockerEnabled bool @@ -259,6 +260,7 @@ func main() { flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)") flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem") flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections") + flag.BoolVar(&flags.probe.useEbpfConn, "probe.ebpf.connections", false, "enable connection tracking with eBPF") // Docker flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes") diff --git a/prog/probe.go b/prog/probe.go index 5d638ea81f..b64df75aaf 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -143,8 +143,14 @@ func probeMain(flags probeFlags, targets []appclient.Target) { var scanner procspy.ConnectionScanner if flags.procEnabled { processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot)) - scanner = procspy.NewConnectionScanner(processCache) + processCache.Tick() p.AddTicker(processCache) + // if eBPF tracking is enabled, scan /proc synchronously, and just once + if flags.useEbpfConn { + scanner = procspy.NewSyncConnectionScanner(processCache) + } else { + scanner = procspy.NewConnectionScanner(processCache) + } p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies)) } @@ -161,6 +167,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { SpyProcs: flags.spyProcs, UseConntrack: flags.useConntrack, WalkProc: flags.procEnabled, + UseEbpfConn: flags.useEbpfConn, ProcRoot: flags.procRoot, BufferSize: flags.conntrackBufferSize, Scanner: scanner, diff --git a/scope b/scope index 4fdb077867..8eab0f8d3c 100755 --- a/scope +++ b/scope @@ -142,6 +142,9 @@ launch_command() { echo docker run --privileged -d --name=$SCOPE_CONTAINER_NAME --net=host --pid=host \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var/run/scope/plugins:/var/run/scope/plugins \ + -v /usr/src:/usr/src \ + -v /lib/modules:/lib/modules \ + -v /sys/kernel/debug:/sys/kernel/debug \ -e CHECKPOINT_DISABLE \ $WEAVESCOPE_DOCKER_ARGS $SCOPE_IMAGE --probe.docker=true }