From 171e978ea9fc5a70066c411838b2d6c3545947cd Mon Sep 17 00:00:00 2001 From: Alvaro Saurin Date: Thu, 27 Aug 2015 14:37:53 +0200 Subject: [PATCH] New asynchronous, caching DNS resolver for reverse resolutions Add nodes for the remote side of connections iff we have a DNS reverse resolution for the IP. --- probe/endpoint/reporter.go | 19 +++++++++-- probe/endpoint/resolver.go | 70 ++++++++++++++++++++++++++++++++++++++ probe/main.go | 17 ++++----- 3 files changed, 95 insertions(+), 11 deletions(-) create mode 100644 probe/endpoint/resolver.go diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 0c7c5bfad4..d77e4a1af3 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -14,6 +14,7 @@ import ( // Node metadata keys. const ( + Name = "name" Addr = "addr" // typically IPv4 Port = "port" ) @@ -24,6 +25,7 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool + revResolver *reverseResolver } // SpyDuration is an exported prometheus metric @@ -43,12 +45,13 @@ var SpyDuration = prometheus.NewSummaryVec( // on the host machine, at the granularity of host and port. That information // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. -func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter { +func NewReporter(hostID, hostName string, includeProcesses bool, revRes *reverseResolver) *Reporter { return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, includeNAT: conntrackModulePresent(), + revResolver: revRes, } } @@ -98,11 +101,21 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ - "name": r.hostName, - Addr: c.LocalAddress.String(), + Name: r.hostName, + Addr: c.LocalAddress.String(), }) } + // Add the remote address iff we have a reverse resolution available + if _, found := rpt.Address.NodeMetadatas[remoteAddressNodeID]; !found { + if raddr, err := r.revResolver.Get(c.RemoteAddress); err == nil { + rpt.Address.NodeMetadatas[remoteAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ + Name: raddr, + Addr: c.RemoteAddress.String(), + }) + } + } + countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) if c.Proc.PID > 0 { diff --git a/probe/endpoint/resolver.go b/probe/endpoint/resolver.go new file mode 100644 index 0000000000..c342f59adc --- /dev/null +++ b/probe/endpoint/resolver.go @@ -0,0 +1,70 @@ +package endpoint + +import ( + "log" + "net" + "time" + + "github.com/bluele/gcache" +) + +const RAddrCacheLen = 500 + +const raddrBacklog = 1000 +const rAddrCacheExpiration = 30 * time.Minute + +type reverseResolver struct { + addresses chan string + cache gcache.Cache +} + +// newReverseResolver starts a new reverse resolver that +// performs reverse resolutions and caches the result. +func NewReverseResolver(cacheLen int) *reverseResolver { + r := reverseResolver{ + addresses: make(chan string, raddrBacklog), + cache: gcache.New(cacheLen).LRU().Expiration(rAddrCacheExpiration).Build(), + } + + go func() { + rate := time.Second / 10 + throttle := time.Tick(rate) + for address := range r.addresses { + <-throttle // rate limit our DNS resolutions + // TODO: we should control "r.addresses" length, otherwise the Get'ers could block... + names, err := net.LookupAddr(address) + if err != nil { + log.Printf("could not resolve address %s: %v", address, err) + continue + } + if len(names) > 0 { + r.cache.Set(address, names[0]) + } + } + }() + + return &r +} + +// Get the reverse resolution for a +func (r *reverseResolver) Get(ip net.IP) (string, error) { + address := ip.String() + val, err := r.cache.Get(address) + if err != nil { + if err == gcache.NotFoundKeyError { + // we trigger a asynchronous reverse resolution when not cached, but we return + // immediately so the client does not block... + r.addresses <- address + } else { + log.Printf("could not GET %s from cache: %v", address, err) + } + return "", err + } + return val.(string), nil +} + +// Stop the async reverse resolver +func (r *reverseResolver) Stop() { + close(r.addresses) +} + diff --git a/probe/main.go b/probe/main.go index d636b386a5..2823a0a127 100644 --- a/probe/main.go +++ b/probe/main.go @@ -40,6 +40,7 @@ func main() { dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes") dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name") weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router") + revResolvLen = flag.Int("raddr.len", endpoint.RAddrCacheLen, "reverse resolution cache length") procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") captureEnabled = flag.Bool("capture", false, "perform sampled packet capture") captureInterfaces = flag.String("capture.interfaces", interfaces(), "packet capture on these interfaces") @@ -102,14 +103,14 @@ func main() { } } - var ( - taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} - reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} - processCache *process.CachingWalker - ) - - processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) - reporters = append(reporters, process.NewReporter(processCache, hostID)) + revResolver := endpoint.NewReverseResolver(*revResolvLen) + taggers := []Tagger{newTopologyTagger(), host.NewTagger(hostID)} + processCache := process.NewCachingWalker(process.NewWalker(*procRoot)) + reporters := []Reporter{ + host.NewReporter(hostID, hostName, localNets), + endpoint.NewReporter(hostID, hostName, *spyProcs, revResolver), + process.NewReporter(processCache, hostID), + } if *dockerEnabled { if err := report.AddLocalBridge(*dockerBridge); err != nil {