Skip to content

Commit

Permalink
Merge pull request #1863 from weaveworks/1082-nodes-for-known-services
Browse files Browse the repository at this point in the history
Add pseudo-nodes for known services
  • Loading branch information
Alfonso Acosta authored Sep 20, 2016
2 parents d008919 + c5ac315 commit c64dfa6
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 15 deletions.
208 changes: 208 additions & 0 deletions probe/endpoint/dns_snooper_linux_amd64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package endpoint

import (
"bytes"
"fmt"
"math"
"time"

log "github.com/Sirupsen/logrus"
"github.com/bluele/gcache"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
)

const (
bufSize = 8 * 1024 * 1024 // 8MB
maxReverseDNSrecords = 10000
)

// DNSSnooper is a snopper of DNS queries
type DNSSnooper struct {
stop chan struct{}
pcapHandle *pcap.Handle
reverseDNSCache gcache.Cache
}

// NewDNSSnooper creates a new snooper of DNS queries
func NewDNSSnooper() (*DNSSnooper, error) {
pcapHandle, err := newPcapHandle()
if err != nil {
return nil, err
}
reverseDNSCache := gcache.New(maxReverseDNSrecords).LRU().Build()

s := &DNSSnooper{
stop: make(chan struct{}),
pcapHandle: pcapHandle,
reverseDNSCache: reverseDNSCache,
}
go s.run()
return s, nil
}

func newPcapHandle() (*pcap.Handle, error) {
inactive, err := pcap.NewInactiveHandle("any")
if err != nil {
return nil, err
}
defer inactive.CleanUp()
// pcap timeout blackmagic copied from Weave Net to reduce CPU consumption
// see https://github.com/weaveworks/weave/commit/025315363d5ea8b8265f1b3ea800f24df2be51a4
if err = inactive.SetTimeout(time.Duration(math.MaxInt64)); err != nil {
return nil, err
}
if err = inactive.SetImmediateMode(true); err != nil {
// If gopacket is compiled against an older pcap.h that
// doesn't have pcap_set_immediate_mode, it supplies a dummy
// definition that always returns PCAP_ERROR. That becomes
// "Generic error", which is not very helpful. The real
// pcap_set_immediate_mode never returns PCAP_ERROR, so this
// turns it into a more informative message.
if fmt.Sprint(err) == "Generic error" {
return nil, fmt.Errorf("compiled against an old version of libpcap; please compile against libpcap-1.5.0 or later")
}

return nil, err
}
if err = inactive.SetBufferSize(bufSize); err != nil {
return nil, err
}
pcapHandle, err := inactive.Activate()
if err != nil {
return nil, err
}
if err := pcapHandle.SetDirection(pcap.DirectionIn); err != nil {
pcapHandle.Close()
return nil, err
}
if err := pcapHandle.SetBPFFilter("inbound and port 53"); err != nil {
pcapHandle.Close()
return nil, err
}

return pcapHandle, nil
}

// CachedNamesForIP obtains the domains associated to an IP,
// obtained while snooping A-record queries
func (s *DNSSnooper) CachedNamesForIP(ip string) []string {
result := []string{}
if s == nil {
return result
}
domains, err := s.reverseDNSCache.Get(ip)
if err != nil {
return result
}

for domain := range domains.(map[string]struct{}) {
result = append(result, domain)
}

return result
}

// Stop makes the snooper stop inspecting DNS communications
func (s *DNSSnooper) Stop() {
if s != nil {
close(s.stop)
}
}

func (s *DNSSnooper) run() {
var (
decodedLayers []gopacket.LayerType
dns layers.DNS
udp layers.UDP
tcp layers.TCP
ip4 layers.IPv4
ip6 layers.IPv6
eth layers.Ethernet
sll layers.LinuxSLL
)

// assumes that the "any" interface is being used (see https://wiki.wireshark.org/SLL)
packetParser := gopacket.NewDecodingLayerParser(layers.LayerTypeLinuxSLL, &sll, &eth, &ip4, &ip6, &udp, &tcp, &dns)

for {
select {
case <-s.stop:
s.pcapHandle.Close()
return
default:
}

packet, _, err := s.pcapHandle.ZeroCopyReadPacketData()
if err != nil {
// TimeoutExpired is acceptable due to the Timeout black magic
// on the handle
if err != pcap.NextErrorTimeoutExpired {
log.Errorf("DNSSnooper: error reading packet data: %s", err)
}
continue
}

if err := packetParser.DecodeLayers(packet, &decodedLayers); err != nil {
log.Errorf("DNSSnooper: error decoding packet: %s", err)
continue
}

for _, layerType := range decodedLayers {
if layerType == layers.LayerTypeDNS {
s.processDNSMessage(&dns)
}
}
}
}

func (s *DNSSnooper) processDNSMessage(dns *layers.DNS) {

// Only consider responses to singleton, A-record questions
if !dns.QR || dns.ResponseCode != 0 || len(dns.Questions) != 1 {
return
}
question := dns.Questions[0]
if question.Type != layers.DNSTypeA || question.Class != layers.DNSClassIN {
return
}

var (
domainQueried = question.Name
records = append(dns.Answers, dns.Additionals...)
ips = map[string]struct{}{}
alias []byte
)

// Traverse records for a CNAME first since the DNS RFCs don't seem to guarantee it
// appearing before its A-records
for _, record := range records {
if record.Type == layers.DNSTypeCNAME && record.Class == layers.DNSClassIN && bytes.Equal(domainQueried, record.Name) {
alias = record.CNAME
break
}
}

// Finally, get the answer
for _, record := range records {
if record.Type != layers.DNSTypeA || record.Class != layers.DNSClassIN {
continue
}
if bytes.Equal(domainQueried, record.Name) || (alias != nil && bytes.Equal(alias, record.Name)) {
ips[record.IP.String()] = struct{}{}
}
}

// Update cache
newDomain := string(domainQueried)
log.Debugf("DNSSnooper: caught DNS lookup: %s -> %v", newDomain, ips)
for ip := range ips {
if existingDomains, err := s.reverseDNSCache.Get(ip); err != nil {
s.reverseDNSCache.Set(ip, map[string]struct{}{newDomain: {}})
} else {
// TODO: Be smarter about the expiration of entries with pre-existing associated domains
existingDomains.(map[string]struct{})[newDomain] = struct{}{}
}
}
}
25 changes: 25 additions & 0 deletions probe/endpoint/dns_snooper_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// +build darwin arm

// Cross-compiling the snooper requires having pcap binaries,
// let's disable it for now.
// See http://stackoverflow.com/questions/31648793/go-programming-cross-compile-for-revel-framework

package endpoint

// DNSSnooper is a snopper of DNS queries
type DNSSnooper struct{}

// NewDNSSnooper creates a new snooper of DNS queries
func NewDNSSnooper() (*DNSSnooper, error) {
return nil, nil
}

// CachedNamesForIP obtains the domains associated to an IP,
// obtained while snooping A-record queries
func (s *DNSSnooper) CachedNamesForIP(ip string) []string {
return []string{}
}

// Stop makes the snooper stop inspecting DNS communications
func (s *DNSSnooper) Stop() {
}
12 changes: 8 additions & 4 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
Conntracked = "conntracked"
Procspied = "procspied"
ReverseDNSNames = "reverse_dns_names"
SnoopedDNSNames = "snooped_dns_names"
)

// Reporter generates Reports containing the Endpoint topology.
Expand All @@ -33,6 +34,7 @@ type Reporter struct {
scanner procspy.ConnectionScanner
natMapper natMapper
reverseResolver *reverseResolver
dnsSnooper *DNSSnooper
}

// SpyDuration is an exported prometheus metric
Expand All @@ -52,7 +54,7 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, spyProcs, useConntrack, walkProc bool, procRoot string, scanner procspy.ConnectionScanner) *Reporter {
func NewReporter(hostID, hostName string, spyProcs, useConntrack, walkProc bool, procRoot string, scanner procspy.ConnectionScanner, dnsSnooper *DNSSnooper) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
Expand All @@ -62,6 +64,7 @@ func NewReporter(hostID, hostName string, spyProcs, useConntrack, walkProc bool,
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, procRoot, "--any-nat")),
reverseResolver: newReverseResolver(),
scanner: scanner,
dnsSnooper: dnsSnooper,
}
}

Expand Down Expand Up @@ -193,9 +196,10 @@ func (r *Reporter) makeEndpointNode(namespaceID string, addr string, port uint16
node := report.MakeNodeWith(
report.MakeEndpointNodeID(r.hostID, namespaceID, addr, portStr),
map[string]string{Addr: addr, Port: portStr})
// In case we have a reverse resolution for the IP, we can use it for
// the name...
if names, err := r.reverseResolver.get(addr); err == nil {
if names := r.dnsSnooper.CachedNamesForIP(addr); len(names) > 0 {
node = node.WithSet(SnoopedDNSNames, report.MakeStringSet(names...))
}
if names, err := r.reverseResolver.get(addr); err == nil && len(names) > 0 {
node = node.WithSet(ReverseDNSNames, report.MakeStringSet(names...))
}
if extra != nil {
Expand Down
4 changes: 2 additions & 2 deletions probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSpyNoProcesses(t *testing.T) {
)

scanner := procspy.FixedScanner(fixConnections)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, false, "", scanner)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, false, "", scanner, nil)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
Expand All @@ -86,7 +86,7 @@ func TestSpyWithProcesses(t *testing.T) {
)

scanner := procspy.FixedScanner(fixConnectionsWithProcesses)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, true, "", scanner)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, true, "", scanner, nil)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

Expand Down
9 changes: 8 additions & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,14 @@ func probeMain(flags probeFlags) {
p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies))
}

endpointReporter := endpoint.NewReporter(hostID, hostName, flags.spyProcs, flags.useConntrack, flags.procEnabled, flags.procRoot, scanner)
dnsSnooper, err := endpoint.NewDNSSnooper()
if err != nil {
log.Errorf("Failed to start DNS snooper: nodes for external services will be less accurate: %s", err)
} else {
defer dnsSnooper.Stop()
}

endpointReporter := endpoint.NewReporter(hostID, hostName, flags.spyProcs, flags.useConntrack, flags.procEnabled, flags.procRoot, scanner, dnsSnooper)
defer endpointReporter.Stop()
p.AddReporter(endpointReporter)

Expand Down
6 changes: 3 additions & 3 deletions render/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func ShortLivedConnectionJoin(r Renderer, toIPs func(report.Node) []string) Rend
return report.Nodes{}
}

// Propagate the internet pseudo node
if strings.HasSuffix(n.ID, TheInternetID) {
// Propagate the internet and service pseudo nodes
if strings.HasSuffix(n.ID, TheInternetID) || strings.HasPrefix(n.ID, ServiceNodeIDPrefix) {
return report.Nodes{n.ID: n}
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func ShortLivedConnectionJoin(r Renderer, toIPs func(report.Node) []string) Rend
return report.Nodes{}
}
if ip := net.ParseIP(addr); ip != nil && !local.Contains(ip) {
node := theInternetNode(m)
node := externalNode(m)
return report.Nodes{node.ID: node}
}

Expand Down
9 changes: 9 additions & 0 deletions render/detailed/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,22 @@ func pseudoNodeSummary(base NodeSummary, n report.Node) (NodeSummary, bool) {
base.Pseudo = true
base.Rank = n.ID

// try rendering as an internet node
if template, ok := templates[n.ID]; ok {
base.Label = template.Label
base.LabelMinor = template.LabelMinor
base.Shape = report.Cloud
return base, true
}

// try rendering as a known service node
if strings.HasPrefix(n.ID, render.ServiceNodeIDPrefix) {
base.Label = n.ID[len(render.ServiceNodeIDPrefix):]
base.LabelMinor = ""
base.Shape = report.Cloud
return base, true
}

// try rendering it as an uncontained node
if strings.HasPrefix(n.ID, render.MakePseudoNodeID(render.UncontainedID)) {
base.Label = render.UncontainedMajor
Expand Down
4 changes: 2 additions & 2 deletions render/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func IsApplication(n report.Node) bool {
var IsSystem = Complement(IsApplication)

// IsNotPseudo returns true if the node is not a pseudo node
// or the internet nodes.
// or internet/service nodes.
func IsNotPseudo(n report.Node) bool {
return n.Topology != Pseudo || strings.HasSuffix(n.ID, TheInternetID)
return n.Topology != Pseudo || strings.HasSuffix(n.ID, TheInternetID) || strings.HasPrefix(n.ID, ServiceNodeIDPrefix)
}

// IsNamespace checks if the node is a pod/service in the specified namespace
Expand Down
Loading

0 comments on commit c64dfa6

Please sign in to comment.