diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index bd71224b7f..c2fea6facf 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -26,6 +26,7 @@ type Reporter struct { includeNAT bool conntracker *Conntracker natmapper *natmapper + revResolver *ReverseResolver } // SpyDuration is an exported prometheus metric @@ -64,12 +65,16 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo log.Printf("Failed to start natMapper: %v", err) } } + + revRes := NewReverseResolver(rAddrCacheLen) + return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, conntracker: conntracker, natmapper: natmapper, + revResolver: revRes, } } @@ -81,6 +86,7 @@ func (r *Reporter) Stop() { if r.natmapper != nil { r.natmapper.Stop() } + r.revResolver.Stop() } // Report implements Reporter. @@ -145,6 +151,13 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin }) ) + // in case we have a reverse resolution for the IP, we can use it for the name... + if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil { + remoteNode = remoteNode.AddMetadata(map[string]string{ + "name": revRemoteName, + }) + } + if localIsClient { // New nodes are merged into the report so we don't need to do any counting here; the merge does it for us. localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{ @@ -177,6 +190,13 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin }) ) + // in case we have a reverse resolution for the IP, we can use it for the name... + if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil { + remoteNode = remoteNode.AddMetadata(map[string]string{ + "name": revRemoteName, + }) + } + if localIsClient { // New nodes are merged into the report so we don't need to do any counting here; the merge does it for us. localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{ diff --git a/probe/endpoint/reporter_test.go b/probe/endpoint/reporter_test.go index d3e3fb1075..c7ec419951 100644 --- a/probe/endpoint/reporter_test.go +++ b/probe/endpoint/reporter_test.go @@ -54,7 +54,7 @@ var ( LocalAddress: fixLocalAddress, LocalPort: fixLocalPort, RemoteAddress: fixRemoteAddress, - RemotePort: fixRemotePortB, + RemotePort: fixRemotePort, Proc: procspy.Proc{ PID: fixProcessPID, Name: fixProcessName, diff --git a/probe/endpoint/resolver.go b/probe/endpoint/resolver.go new file mode 100644 index 0000000000..99ca8ff7ff --- /dev/null +++ b/probe/endpoint/resolver.go @@ -0,0 +1,72 @@ +package endpoint + +import ( + "net" + "time" + + "github.com/bluele/gcache" +) + +const ( + rAddrCacheLen = 500 // Default cache length + rAddrBacklog = 1000 + rAddrCacheExpiration = 30 * time.Minute +) + +// ReverseResolver is a caching, reverse resolver +type ReverseResolver struct { + addresses chan string + cache gcache.Cache +} + +// NewReverseResolver starts a new reverse resolver that +// performs reverse resolutions and caches the result. +func NewReverseResolver(cacheLen int) *ReverseResolver { + r := ReverseResolver{ + addresses: make(chan string, rAddrBacklog), + cache: gcache.New(cacheLen).LRU().Expiration(rAddrCacheExpiration).Build(), + } + + go r.run() + return &r +} + +// Get the reverse resolution for an IP address if already in the cache, gcache.NotFoundKeyError otherwise +// Note: it returns one of the possible names that can be obtained for that IP +func (r *ReverseResolver) Get(address string) (string, error) { + val, err := r.cache.Get(address) + if err == nil { + return val.(string), nil + } + if err == gcache.NotFoundKeyError { + // we trigger a asynchronous reverse resolution when not cached + select { + case r.addresses <- address: + default: + } + } + return "", err +} + +func (r *ReverseResolver) run() { + throttle := time.Tick(time.Second / 10) + for address := range r.addresses { + <-throttle // rate limit our DNS resolutions + _, err := r.cache.Get(address) // and check if the answer is already in the cache + if err == nil { + continue + } + names, err := net.LookupAddr(address) + if err != nil { + continue + } + if len(names) > 0 { + r.cache.Set(address, names[0]) + } + } +} + +// Stop the async reverse resolver +func (r *ReverseResolver) Stop() { + close(r.addresses) +} diff --git a/render/detailed_node.go b/render/detailed_node.go index 7459aea7b1..fb1aaa3984 100644 --- a/render/detailed_node.go +++ b/render/detailed_node.go @@ -205,8 +205,11 @@ func OriginTable(r report.Report, originID string, addHostTags bool, addContaine func connectionDetailsRows(topology report.Topology, originID string) []Row { rows := []Row{} - labeler := func(nodeID string) (string, bool) { + labeler := func(nodeID string, meta map[string]string) (string, bool) { if _, addr, port, ok := report.ParseEndpointNodeID(nodeID); ok { + if name, found := meta["name"]; found { + return fmt.Sprintf("%s:%s", name, port), true + } return fmt.Sprintf("%s:%s", addr, port), true } if _, addr, ok := report.ParseAddressNodeID(nodeID); ok { @@ -214,13 +217,13 @@ func connectionDetailsRows(topology report.Topology, originID string) []Row { } return "", false } - local, ok := labeler(originID) + local, ok := labeler(originID, topology.Nodes[originID].Metadata) if !ok { return rows } // Firstly, collection outgoing connections from this node. for _, serverNodeID := range topology.Nodes[originID].Adjacency { - remote, ok := labeler(serverNodeID) + remote, ok := labeler(serverNodeID, topology.Nodes[serverNodeID].Metadata) if !ok { continue } @@ -239,7 +242,7 @@ func connectionDetailsRows(topology report.Topology, originID string) []Row { if !serverNodeIDs.Contains(originID) { continue } - remote, ok := labeler(clientNodeID) + remote, ok := labeler(clientNodeID, clientNode.Metadata) if !ok { continue } diff --git a/report/topology.go b/report/topology.go index a938b7095b..bf237d996b 100644 --- a/report/topology.go +++ b/report/topology.go @@ -102,6 +102,12 @@ func (n Node) WithMetadata(m map[string]string) Node { return result } +// AddMetadata returns a fresh copy of n, with Metadata set to the merge of n and the metadata provided +func (n Node) AddMetadata(m map[string]string) Node { + additional := MakeNodeWith(m) + return n.Merge(additional) +} + // WithCounters returns a fresh copy of n, with Counters set to c func (n Node) WithCounters(c map[string]int) Node { result := n.Copy()