Skip to content

Commit

Permalink
New asynchronous, caching DNS resolver for reverse resolutions
Browse files Browse the repository at this point in the history
Add nodes for the remote side of connections iff we have a DNS reverse resolution for the IP.
  • Loading branch information
inercia committed Aug 27, 2015
1 parent 0a9ce21 commit 171e978
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 11 deletions.
19 changes: 16 additions & 3 deletions probe/endpoint/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

// Node metadata keys.
const (
Name = "name"
Addr = "addr" // typically IPv4
Port = "port"
)
Expand All @@ -24,6 +25,7 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
revResolver *reverseResolver
}

// SpyDuration is an exported prometheus metric
Expand All @@ -43,12 +45,13 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter {
func NewReporter(hostID, hostName string, includeProcesses bool, revRes *reverseResolver) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
includeNAT: conntrackModulePresent(),
revResolver: revRes,
}
}

Expand Down Expand Up @@ -98,11 +101,21 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {

if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: c.LocalAddress.String(),
Name: r.hostName,
Addr: c.LocalAddress.String(),
})
}

// Add the remote address iff we have a reverse resolution available
if _, found := rpt.Address.NodeMetadatas[remoteAddressNodeID]; !found {
if raddr, err := r.revResolver.Get(c.RemoteAddress); err == nil {
rpt.Address.NodeMetadatas[remoteAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
Name: raddr,
Addr: c.RemoteAddress.String(),
})
}
}

countTCPConnection(rpt.Address.EdgeMetadatas, edgeID)

if c.Proc.PID > 0 {
Expand Down
70 changes: 70 additions & 0 deletions probe/endpoint/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package endpoint

import (
"log"
"net"
"time"

"github.com/bluele/gcache"
)

const RAddrCacheLen = 500

const raddrBacklog = 1000
const rAddrCacheExpiration = 30 * time.Minute

type reverseResolver struct {
addresses chan string
cache gcache.Cache
}

// newReverseResolver starts a new reverse resolver that
// performs reverse resolutions and caches the result.
func NewReverseResolver(cacheLen int) *reverseResolver {
r := reverseResolver{
addresses: make(chan string, raddrBacklog),
cache: gcache.New(cacheLen).LRU().Expiration(rAddrCacheExpiration).Build(),
}

go func() {
rate := time.Second / 10
throttle := time.Tick(rate)
for address := range r.addresses {
<-throttle // rate limit our DNS resolutions
// TODO: we should control "r.addresses" length, otherwise the Get'ers could block...
names, err := net.LookupAddr(address)
if err != nil {
log.Printf("could not resolve address %s: %v", address, err)
continue
}
if len(names) > 0 {
r.cache.Set(address, names[0])
}
}
}()

return &r
}

// Get the reverse resolution for a
func (r *reverseResolver) Get(ip net.IP) (string, error) {
address := ip.String()
val, err := r.cache.Get(address)
if err != nil {
if err == gcache.NotFoundKeyError {
// we trigger a asynchronous reverse resolution when not cached, but we return
// immediately so the client does not block...
r.addresses <- address
} else {
log.Printf("could not GET %s from cache: %v", address, err)
}
return "", err
}
return val.(string), nil
}

// Stop the async reverse resolver
func (r *reverseResolver) Stop() {
close(r.addresses)
}

17 changes: 9 additions & 8 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func main() {
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
revResolvLen = flag.Int("raddr.len", endpoint.RAddrCacheLen, "reverse resolution cache length")
procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem")
captureEnabled = flag.Bool("capture", false, "perform sampled packet capture")
captureInterfaces = flag.String("capture.interfaces", interfaces(), "packet capture on these interfaces")
Expand Down Expand Up @@ -102,14 +103,14 @@ func main() {
}
}

var (
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker
)

processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))
revResolver := endpoint.NewReverseResolver(*revResolvLen)
taggers := []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
reporters := []Reporter{
host.NewReporter(hostID, hostName, localNets),
endpoint.NewReporter(hostID, hostName, *spyProcs, revResolver),
process.NewReporter(processCache, hostID),
}

if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {
Expand Down

0 comments on commit 171e978

Please sign in to comment.