From 917a48a45fe1b8a2a73e86fba7ce682d6378f019 Mon Sep 17 00:00:00 2001 From: Alvaro Saurin Date: Thu, 27 Aug 2015 14:37:53 +0200 Subject: [PATCH] New asynchronous, caching DNS resolver for reverse resolutions Add nodes for the remote side of connections iff we have a DNS reverse resolution for the IP. --- probe/endpoint/reporter.go | 19 +++++++-- probe/endpoint/reporter_test.go | 6 ++- probe/endpoint/resolver.go | 71 +++++++++++++++++++++++++++++++++ probe/main.go | 17 ++++---- 4 files changed, 100 insertions(+), 13 deletions(-) create mode 100644 probe/endpoint/resolver.go diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 0c7c5bfad4..e99dea4cd6 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -14,6 +14,7 @@ import ( // Node metadata keys. const ( + Name = "name" Addr = "addr" // typically IPv4 Port = "port" ) @@ -24,6 +25,7 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool + revResolver *ReverseResolver } // SpyDuration is an exported prometheus metric @@ -43,12 +45,13 @@ var SpyDuration = prometheus.NewSummaryVec( // on the host machine, at the granularity of host and port. That information // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. -func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter { +func NewReporter(hostID, hostName string, includeProcesses bool, revRes *ReverseResolver) *Reporter { return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, includeNAT: conntrackModulePresent(), + revResolver: revRes, } } @@ -98,11 +101,21 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ - "name": r.hostName, - Addr: c.LocalAddress.String(), + Name: r.hostName, + Addr: c.LocalAddress.String(), }) } + // Add the remote address iff we have a reverse resolution available + if _, found := rpt.Address.NodeMetadatas[remoteAddressNodeID]; !found { + if raddr, err := r.revResolver.Get(c.RemoteAddress); err == nil { + rpt.Address.NodeMetadatas[remoteAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ + Name: raddr, + Addr: c.RemoteAddress.String(), + }) + } + } + countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) if c.Proc.PID > 0 { diff --git a/probe/endpoint/reporter_test.go b/probe/endpoint/reporter_test.go index 455d18ada9..753f807d65 100644 --- a/probe/endpoint/reporter_test.go +++ b/probe/endpoint/reporter_test.go @@ -72,7 +72,8 @@ func TestSpyNoProcesses(t *testing.T) { nodeName = "frenchs-since-1904" // TODO rename to hostNmae ) - reporter := endpoint.NewReporter(nodeID, nodeName, false) + revResolver := endpoint.NewReverseResolver(endpoint.RAddrCacheLen) + reporter := endpoint.NewReporter(nodeID, nodeName, false, revResolver) r, _ := reporter.Report() //buf, _ := json.MarshalIndent(r, "", " ") //t.Logf("\n%s\n", buf) @@ -109,7 +110,8 @@ func TestSpyWithProcesses(t *testing.T) { nodeName = "fishermans-friend" // TODO rename to hostNmae ) - reporter := endpoint.NewReporter(nodeID, nodeName, false) + revResolver := endpoint.NewReverseResolver(endpoint.RAddrCacheLen) + reporter := endpoint.NewReporter(nodeID, nodeName, false, revResolver) r, _ := reporter.Report() // buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf) diff --git a/probe/endpoint/resolver.go b/probe/endpoint/resolver.go new file mode 100644 index 0000000000..e9c4b8049d --- /dev/null +++ b/probe/endpoint/resolver.go @@ -0,0 +1,71 @@ +package endpoint + +import ( + "log" + "net" + "time" + + "github.com/bluele/gcache" +) + +// Default cache length +const RAddrCacheLen = 500 + +const raddrBacklog = 1000 +const rAddrCacheExpiration = 30 * time.Minute + +// ReverseResolver is a caching, reverse resolver +type ReverseResolver struct { + addresses chan string + cache gcache.Cache +} + +// NewReverseResolver starts a new reverse resolver that +// performs reverse resolutions and caches the result. +func NewReverseResolver(cacheLen int) *ReverseResolver { + r := ReverseResolver{ + addresses: make(chan string, raddrBacklog), + cache: gcache.New(cacheLen).LRU().Expiration(rAddrCacheExpiration).Build(), + } + + go func() { + rate := time.Second / 10 + throttle := time.Tick(rate) + for address := range r.addresses { + <-throttle // rate limit our DNS resolutions + names, err := net.LookupAddr(address) + if err != nil { + continue + } + if len(names) > 0 { + r.cache.Set(address, names[0]) + } + } + }() + + return &r +} + +// Get the reverse resolution for a +func (r *ReverseResolver) Get(ip net.IP) (string, error) { + address := ip.String() + val, err := r.cache.Get(address) + if err != nil { + if err == gcache.NotFoundKeyError { + // we trigger a asynchronous reverse resolution when not cached + select { + case r.addresses <- address: + default: + } + } else { + log.Printf("could not GET %s from cache: %v", address, err) + } + return "", err + } + return val.(string), nil +} + +// Stop the async reverse resolver +func (r *ReverseResolver) Stop() { + close(r.addresses) +} diff --git a/probe/main.go b/probe/main.go index d636b386a5..2e323e5f6f 100644 --- a/probe/main.go +++ b/probe/main.go @@ -40,6 +40,7 @@ func main() { dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes") dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name") weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router") + revResolvLen = flag.Int("raddr.len", endpoint.RAddrCacheLen, "reverse resolution cache length") procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem") captureEnabled = flag.Bool("capture", false, "perform sampled packet capture") captureInterfaces = flag.String("capture.interfaces", interfaces(), "packet capture on these interfaces") @@ -102,14 +103,14 @@ func main() { } } - var ( - taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} - reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} - processCache *process.CachingWalker - ) - - processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) - reporters = append(reporters, process.NewReporter(processCache, hostID)) + revResolver := endpoint.NewReverseResolver(*revResolvLen) + taggers := []Tagger{newTopologyTagger(), host.NewTagger(hostID)} + processCache := process.NewCachingWalker(process.NewWalker(*procRoot)) + reporters := []Reporter{ + host.NewReporter(hostID, hostName, localNets), + endpoint.NewReporter(hostID, hostName, *spyProcs, revResolver), + process.NewReporter(processCache, hostID), + } if *dockerEnabled { if err := report.AddLocalBridge(*dockerBridge); err != nil {