Skip to content

Commit

Permalink
New asynchronous, caching DNS resolver for reverse resolutions
Browse files Browse the repository at this point in the history
Add nodes for the remote side of connections iff we have a DNS reverse resolution for the IP.
  • Loading branch information
inercia committed Aug 28, 2015
1 parent 0a9ce21 commit 917a48a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 13 deletions.
19 changes: 16 additions & 3 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

// Node metadata keys.
const (
Name = "name"
Addr = "addr" // typically IPv4
Port = "port"
)
Expand All @@ -24,6 +25,7 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
revResolver *ReverseResolver
}

// SpyDuration is an exported prometheus metric
Expand All @@ -43,12 +45,13 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter {
func NewReporter(hostID, hostName string, includeProcesses bool, revRes *ReverseResolver) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
includeNAT: conntrackModulePresent(),
revResolver: revRes,
}
}

Expand Down Expand Up @@ -98,11 +101,21 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {

if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: c.LocalAddress.String(),
Name: r.hostName,
Addr: c.LocalAddress.String(),
})
}

// Add the remote address iff we have a reverse resolution available
if _, found := rpt.Address.NodeMetadatas[remoteAddressNodeID]; !found {
if raddr, err := r.revResolver.Get(c.RemoteAddress); err == nil {
rpt.Address.NodeMetadatas[remoteAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
Name: raddr,
Addr: c.RemoteAddress.String(),
})
}
}

countTCPConnection(rpt.Address.EdgeMetadatas, edgeID)

if c.Proc.PID > 0 {
Expand Down
6 changes: 4 additions & 2 deletions probe/endpoint/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestSpyNoProcesses(t *testing.T) {
nodeName = "frenchs-since-1904" // TODO rename to hostNmae
)

reporter := endpoint.NewReporter(nodeID, nodeName, false)
revResolver := endpoint.NewReverseResolver(endpoint.RAddrCacheLen)
reporter := endpoint.NewReporter(nodeID, nodeName, false, revResolver)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
Expand Down Expand Up @@ -109,7 +110,8 @@ func TestSpyWithProcesses(t *testing.T) {
nodeName = "fishermans-friend" // TODO rename to hostNmae
)

reporter := endpoint.NewReporter(nodeID, nodeName, false)
revResolver := endpoint.NewReverseResolver(endpoint.RAddrCacheLen)
reporter := endpoint.NewReporter(nodeID, nodeName, false, revResolver)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

Expand Down
71 changes: 71 additions & 0 deletions probe/endpoint/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package endpoint

import (
"log"
"net"
"time"

"github.com/bluele/gcache"
)

// Default cache length
const RAddrCacheLen = 500

const raddrBacklog = 1000
const rAddrCacheExpiration = 30 * time.Minute

// ReverseResolver is a caching, reverse resolver
type ReverseResolver struct {
addresses chan string
cache gcache.Cache
}

// NewReverseResolver starts a new reverse resolver that
// performs reverse resolutions and caches the result.
func NewReverseResolver(cacheLen int) *ReverseResolver {
r := ReverseResolver{
addresses: make(chan string, raddrBacklog),
cache: gcache.New(cacheLen).LRU().Expiration(rAddrCacheExpiration).Build(),
}

go func() {
rate := time.Second / 10
throttle := time.Tick(rate)
for address := range r.addresses {
<-throttle // rate limit our DNS resolutions
names, err := net.LookupAddr(address)
if err != nil {
continue
}
if len(names) > 0 {
r.cache.Set(address, names[0])
}
}
}()

return &r
}

// Get the reverse resolution for a
func (r *ReverseResolver) Get(ip net.IP) (string, error) {
address := ip.String()
val, err := r.cache.Get(address)
if err != nil {
if err == gcache.NotFoundKeyError {
// we trigger a asynchronous reverse resolution when not cached
select {
case r.addresses <- address:
default:
}
} else {
log.Printf("could not GET %s from cache: %v", address, err)
}
return "", err
}
return val.(string), nil
}

// Stop the async reverse resolver
func (r *ReverseResolver) Stop() {
close(r.addresses)
}
17 changes: 9 additions & 8 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func main() {
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
revResolvLen = flag.Int("raddr.len", endpoint.RAddrCacheLen, "reverse resolution cache length")
procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem")
captureEnabled = flag.Bool("capture", false, "perform sampled packet capture")
captureInterfaces = flag.String("capture.interfaces", interfaces(), "packet capture on these interfaces")
Expand Down Expand Up @@ -102,14 +103,14 @@ func main() {
}
}

var (
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker
)

processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))
revResolver := endpoint.NewReverseResolver(*revResolvLen)
taggers := []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
reporters := []Reporter{
host.NewReporter(hostID, hostName, localNets),
endpoint.NewReporter(hostID, hostName, *spyProcs, revResolver),
process.NewReporter(processCache, hostID),
}

if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {
Expand Down

0 comments on commit 917a48a

Please sign in to comment.